You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/24 10:23:37 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1206 - Rework and test ExecutePythonProcessor, add inline script support

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 032fc4f  MINIFICPP-1206 - Rework and test ExecutePythonProcessor, add inline script support
032fc4f is described below

commit 032fc4f45d203e6b52c65c59dc7085f8a587f433
Author: Adam Hunyadi <hu...@gmail.com>
AuthorDate: Tue May 5 14:22:24 2020 +0200

    MINIFICPP-1206 - Rework and test ExecutePythonProcessor, add inline script support
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #784
---
 PROCESSORS.md                                      |   7 +-
 extensions/script/ExecuteScript.cpp                |  11 +-
 .../script/python/ExecutePythonProcessor.cpp       | 275 ++++++++++++---------
 extensions/script/python/ExecutePythonProcessor.h  |  59 ++---
 libminifi/include/utils/TestUtils.h                |  67 +++++
 libminifi/test/script-tests/CMakeLists.txt         |  33 ++-
 .../script-tests/ExecutePythonProcessorTests.cpp   | 220 +++++++++++++++++
 ...=> TestExecuteScriptProcessorWithLuaScript.cpp} |   0
 ...TestExecuteScriptProcessorWithPythonScript.cpp} |   0
 .../test_scripts/non_transferring_processor.py     |  27 ++
 ...passthrough_processor_transfering_to_failure.py |  28 +++
 ...passthrough_processor_transfering_to_success.py |  28 +++
 .../test_scripts/stateful_processor.py             |  42 ++++
 13 files changed, 632 insertions(+), 165 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 165c859..6183a2e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -233,15 +233,16 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description 
 
-Executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each  [...]
+Executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each  [...]
+
 ### Properties 
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
 | Name | Default Value | Allowable Values | Description | 
 | - | - | - | - | 
-|Module Directory|||Comma-separated list of paths to files and/or directories which contain modules required by the script|
-|Script File|||Path to script file to execute|
+|Script File|||Path to script file to execute. Only one of Script File or Script Body may be used|
+|Script Body|||Script to execute|
 ### Relationships
 
 | Name | Description |
diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp
index 2751267..1656c9a 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -40,16 +40,13 @@ namespace minifi {
 namespace processors {
 
 core::Property ExecuteScript::ScriptEngine("Script Engine",  // NOLINT
-                                           "The engine to execute scripts (python, lua)", "python");
+    R"(The engine to execute scripts (python, lua))", "python");
 core::Property ExecuteScript::ScriptFile("Script File",  // NOLINT
-                                         R"(Path to script file to execute.
-                                            Only one of Script File or Script Body may be used)", "");
+    R"(Path to script file to execute. Only one of Script File or Script Body may be used)", "");
 core::Property ExecuteScript::ScriptBody("Script Body",  // NOLINT
-                                         R"(Body of script to execute.
-                                            Only one of Script File or Script Body may be used)", "");
+    R"(Body of script to execute. Only one of Script File or Script Body may be used)", "");
 core::Property ExecuteScript::ModuleDirectory("Module Directory",  // NOLINT
-                                              R"(Comma-separated list of paths to files and/or directories which
-                                                 contain modules required by the script)", "");
+    R"(Comma-separated list of paths to files and/or directories which contain modules required by the script)", "");
 
 core::Relationship ExecuteScript::Success("success", "Script successes");  // NOLINT
 core::Relationship ExecuteScript::Failure("failure", "Script failures");  // NOLINT
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp b/extensions/script/python/ExecutePythonProcessor.cpp
index ae34ccc..4a6653f 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -19,14 +19,14 @@
  * limitations under the License.
  */
 
-#include <memory>
 #include <set>
-#include <utility>
-#include <exception>
 #include <stdexcept>
+#include <utility>
 
 #include "ExecutePythonProcessor.h"
 
+#include "utils/StringUtils.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -34,156 +34,193 @@ namespace minifi {
 namespace python {
 namespace processors {
 
-core::Property ExecutePythonProcessor::ScriptFile("Script File",  // NOLINT
-    R"(Path to script file to execute)", "");
-core::Property ExecutePythonProcessor::ModuleDirectory("Module Directory",  // NOLINT
-    R"(Comma-separated list of paths to files and/or directories which
-                                                 contain modules required by the script)", "");
+core::Property ExecutePythonProcessor::ScriptFile(core::PropertyBuilder::createProperty("Script File")
+    ->withDescription("Path to script file to execute. Only one of Script File or Script Body may be used")
+    ->withDefaultValue("")
+    ->build());
+
+core::Property ExecutePythonProcessor::ScriptBody(core::PropertyBuilder::createProperty("Script Body")
+    ->withDescription("Script to execute. Only one of Script File or Script Body may be used")
+    ->withDefaultValue("")
+    ->build());
 
-core::Relationship ExecutePythonProcessor::Success("success", "Script successes");  // NOLINT
-core::Relationship ExecutePythonProcessor::Failure("failure", "Script failures");  // NOLINT
+core::Property ExecutePythonProcessor::ModuleDirectory(core::PropertyBuilder::createProperty("Module Directory")
+  ->withDescription("Comma-separated list of paths to files and/or directories which contain modules required by the script")
+  ->withDefaultValue("")
+  ->build());
+
+core::Relationship ExecutePythonProcessor::Success("success", "Script successes");
+core::Relationship ExecutePythonProcessor::Failure("failure", "Script failures");
 
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if (script_to_exec_.size()) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
-  }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read the script file content every time the processor is triggered. This should change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is available to execute");
     }
 
-    triggerEngineProcessor(engine, context, session);
-
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+    engine->onTrigger(context, session);
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception &exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
     this->yield();
-  } catch (...) {
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
     this->yield();
   }
 }
 
+// TODO(hunyadi): This is potentially not what we want. See https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine> ExecutePythonProcessor::getScriptEngine() {
+  std::shared_ptr<python::PythonScriptEngine> engine;
+  // Use an existing engine, if one is available
+  if (script_engine_q_.try_dequeue(engine)) {
+    logger_->log_debug("Using available [%p] script engine instance", engine.get());
+    return engine;
+  }
+  engine = createEngine<python::PythonScriptEngine>();
+  logger_->log_info("Created new [%p] script engine instance. Number of instances: approx. %d / %d.", engine.get(), script_engine_q_.size_approx(), getMaxConcurrentTasks());
+  if (engine == nullptr) {
+    throw std::runtime_error("No script engine available");
+  }
+  return engine;
+}
+
+void ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&& engine) {
+  // Make engine available for use again
+  if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
+    logger_->log_debug("Releasing [%p] script engine", engine.get());
+    script_engine_q_.enqueue(engine);
+  } else {
+    logger_->log_info("Destroying script engine because it is no longer needed");
+  }
+}
+
+void ExecutePythonProcessor::appendPathForImportModules() {
+  // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
+  // could not get this working yet. It is up to be implemented later
+  // https://issues.apache.org/jira/browse/MINIFICPP-1224
+  if (module_directory_.size()) {
+    logger_->log_error("Not supported property: Module Directory.");
+  }
+}
+
+void ExecutePythonProcessor::loadScriptFromFile(const std::string& file_path) {
+  std::ifstream file_handle(file_path);
+  if (!file_handle.is_open()) {
+    script_to_exec_ = "";
+    throw std::runtime_error("Failed to read Script File: " + file_path);
+  }
+  script_to_exec_ = std::string{ (std::istreambuf_iterator<char>(file_handle)), (std::istreambuf_iterator<char>()) };
+}
+
+void ExecutePythonProcessor::loadScript() {
+  std::string script_file;
+  std::string script_body;
+  getProperty(ScriptFile.getName(), script_file);
+  getProperty(ScriptBody.getName(), script_body);
+  if (script_file.empty() && script_body.empty()) {
+    throw std::runtime_error("Neither Script Body nor Script File is available to execute");
+  }
+  if (script_file.size()) {
+    if (script_body.size()) {
+      throw std::runtime_error("Only one of Script File or Script Body may be used");
+    }
+    loadScriptFromFile(script_file);
+    return;
+  }
+  script_to_exec_ = script_body;
+  return;
+}
+
+void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() {
+  std::string script_file;
+  std::string script_body;
+  getProperty(ScriptFile.getName(), script_file);
+  getProperty(ScriptBody.getName(), script_body);
+  if (script_file.size() && script_body.empty()) {
+    loadScriptFromFile(script_file);
+  }
+}
+
 } /* namespace processors */
 } /* namespace python */
 } /* namespace minifi */
diff --git a/extensions/script/python/ExecutePythonProcessor.h b/extensions/script/python/ExecutePythonProcessor.h
index a487554..e138d05 100644
--- a/extensions/script/python/ExecutePythonProcessor.h
+++ b/extensions/script/python/ExecutePythonProcessor.h
@@ -18,12 +18,16 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_EXECUTEPYPROC_H
-#define NIFI_MINIFI_CPP_EXECUTEPYPROC_H
+#ifndef EXTENSIONS_SCRIPT_PYTHON_EXECUTEPYTHONPROCESSOR_H_
+#define EXTENSIONS_SCRIPT_PYTHON_EXECUTEPYTHONPROCESSOR_H_
 
-#include <concurrentqueue.h>
-#include <core/Resource.h>
-#include <core/Processor.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "concurrentqueue.h" // NOLINT
+#include "core/Resource.h"
+#include "core/Processor.h"
 
 #include "../ScriptEngine.h"
 #include "../ScriptProcessContext.h"
@@ -48,14 +52,15 @@ class ExecutePythonProcessor : public core::Processor {
   }
 
   static core::Property ScriptFile;
+  static core::Property ScriptBody;
   static core::Property ModuleDirectory;
 
   static core::Relationship Success;
   static core::Relationship Failure;
 
-  virtual void initialize() override;
-  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
   void setSupportsDynamicProperties() {
     python_dynamic_ = true;
@@ -82,12 +87,11 @@ class ExecutePythonProcessor : public core::Processor {
     return description_;
   }
 
-  virtual bool supportsDynamicProperties() override {
+  bool supportsDynamicProperties() override {
     return false;
   }
 
  private:
-
   std::vector<core::Property> python_properties_;
 
   std::string description_;
@@ -99,11 +103,18 @@ class ExecutePythonProcessor : public core::Processor {
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<logging::Logger> python_logger_;
 
-  std::string script_engine_;
-  std::string script_file_;
+  std::string script_to_exec_;
   std::string module_directory_;
 
-  moodycamel::ConcurrentQueue<std::shared_ptr<script::ScriptEngine>> script_engine_q_;
+  moodycamel::ConcurrentQueue<std::shared_ptr<python::PythonScriptEngine>> script_engine_q_;
+
+  std::shared_ptr<python::PythonScriptEngine> getScriptEngine();
+  void handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&& engine);
+  void appendPathForImportModules();
+  void loadScriptFromFile(const std::string& file_path);
+  void loadScript();
+  void reloadScriptIfUsingScriptFileProperty();
+
 
   template<typename T>
   std::shared_ptr<T> createEngine() const {
@@ -115,26 +126,6 @@ class ExecutePythonProcessor : public core::Processor {
 
     return engine;
   }
-
-  void triggerEngineProcessor(const std::shared_ptr<script::ScriptEngine> &engine, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) const {
-    auto typed_engine = std::static_pointer_cast<python::PythonScriptEngine>(engine);
-    typed_engine->onTrigger(context, session);
-  }
-
-  void triggerSchedule(const std::shared_ptr<script::ScriptEngine> &engine, const std::shared_ptr<core::ProcessContext> &context) const {
-    auto typed_engine = std::static_pointer_cast<python::PythonScriptEngine>(engine);
-    typed_engine->onSchedule(context);
-  }
-
-  void triggerInitialize(const std::shared_ptr<script::ScriptEngine> &engine, const std::shared_ptr<core::Processor> &proc) const {
-    auto typed_engine = std::static_pointer_cast<python::PythonScriptEngine>(engine);
-    typed_engine->onInitialize(proc);
-  }
-
-  void triggerDescribe(const std::shared_ptr<script::ScriptEngine> &engine, const std::shared_ptr<core::Processor> &proc) const {
-    auto typed_engine = std::static_pointer_cast<python::PythonScriptEngine>(engine);
-    typed_engine->describe(proc);
-  }
 };
 
 REGISTER_RESOURCE(
@@ -151,4 +142,4 @@ REGISTER_RESOURCE(
 } /* namespace apache */
 } /* namespace org */
 
-#endif //NIFI_MINIFI_CPP_EXECUTEPYPROC_H
+#endif  // EXTENSIONS_SCRIPT_PYTHON_EXECUTEPYTHONPROCESSOR_H_
diff --git a/libminifi/include/utils/TestUtils.h b/libminifi/include/utils/TestUtils.h
new file mode 100644
index 0000000..91f854d
--- /dev/null
+++ b/libminifi/include/utils/TestUtils.h
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "../../test/TestBase.h"
+#include "utils/file/FileUtils.h"
+#include "utils/Environment.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string createTempDir(TestController* testController) {
+  char dirtemplate[] = "/tmp/gt.XXXXXX";
+  std::string temp_dir = testController->createTempDirectory(dirtemplate);
+  REQUIRE(!temp_dir.empty());
+  REQUIRE(file::FileUtils::is_directory(temp_dir.c_str()));
+  return temp_dir;
+}
+
+std::string putFileToDir(const std::string& dir_path, const std::string& file_name, const std::string& content) {
+  std::string file_path(file::FileUtils::concat_path(dir_path, file_name));
+  std::ofstream out_file(file_path, std::ios::binary | std::ios::out);
+  if (out_file.is_open()) {
+    out_file << content;
+  }
+  return file_path;
+}
+
+std::string createTempDirWithFile(TestController* testController, const std::string& file_name, const std::string& content) {
+  std::string temp_dir = createTempDir(testController);
+  putFileToDir(temp_dir, file_name, content);
+  return temp_dir;
+}
+
+std::string getFileContent(const std::string& file_name) {
+  std::ifstream file_handle(file_name, std::ios::binary | std::ios::in);
+  REQUIRE(file_handle.is_open());
+  const std::string file_content{ (std::istreambuf_iterator<char>(file_handle)), (std::istreambuf_iterator<char>()) };
+  return file_content;
+}
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/test/script-tests/CMakeLists.txt b/libminifi/test/script-tests/CMakeLists.txt
index 5ac0314..c1141c1 100644
--- a/libminifi/test/script-tests/CMakeLists.txt
+++ b/libminifi/test/script-tests/CMakeLists.txt
@@ -18,7 +18,14 @@
 #
 
 if (NOT DISABLE_PYTHON_SCRIPTING)
-	file(GLOB EXECUTESCRIPT_PYTHON_INTEGRATION_TESTS  "Python*.cpp")
+	file(GLOB EXECUTESCRIPT_PYTHON_INTEGRATION_TESTS  "TestExecuteScriptProcessorWithPythonScript.cpp")
+	file(GLOB EXECUTEPYTHONPROCESSOR_UNIT_TESTS  "ExecutePythonProcessorTests.cpp")
+	file(GLOB PY_SOURCES  "python/*.cpp")
+	find_package(PythonLibs 3.5)
+	if (NOT PYTHONLIBS_FOUND)
+		find_package(PythonLibs 3.0 REQUIRED)
+	endif()
+	file(COPY "${CMAKE_SOURCE_DIR}/libminifi/test/script-tests/test_scripts" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}/")
 endif()
 
 if (ENABLE_LUA_SCRIPTING)
@@ -42,6 +49,28 @@ FOREACH(testfile ${EXECUTESCRIPT_PYTHON_INTEGRATION_TESTS})
 	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
 ENDFOREACH()
 
+FOREACH(testfile ${EXECUTEPYTHONPROCESSOR_UNIT_TESTS})
+	get_filename_component(testfilename "${testfile}" NAME_WE)
+	add_executable("${testfilename}" "${testfile}")
+
+	include_directories(${PYTHON_INCLUDE_DIR})
+	include_directories(../../thirdparty/pybind11/include)
+	include_directories(python)
+	add_definitions(-DPYTHON_SUPPORT)
+
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/script")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/script/python")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/thirdparty/pybind11/include")
+
+	target_wholearchive_library(${testfilename} minifi-script-extensions)
+	target_wholearchive_library(${testfilename} minifi-standard-processors)
+
+	createTests("${testfilename}")
+	add_test(NAME "${testfilename}" COMMAND "${testfilename}"  WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
+	MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1")
+ENDFOREACH()
+
 FOREACH(testfile ${EXECUTESCRIPT_LUA_INTEGRATION_TESTS})
 	get_filename_component(testfilename "${testfile}" NAME_WE)
 	add_executable("${testfilename}" "${testfile}")
@@ -57,4 +86,4 @@ FOREACH(testfile ${EXECUTESCRIPT_LUA_INTEGRATION_TESTS})
 	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
 ENDFOREACH()
 
-message("-- Finished building ${PYTHON_EXECUTESCRIPT-EXTENSIONS_TEST_COUNT} Python ExecuteScript related test file(s)...")
+message("-- Finished building ${EXTENSIONS_TEST_COUNT} Python ExecuteScript related test file(s)...")
diff --git a/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
new file mode 100644
index 0000000..30f116d
--- /dev/null
+++ b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
@@ -0,0 +1,220 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::createTempDir;
+using org::apache::nifi::minifi::utils::putFileToDir;
+using org::apache::nifi::minifi::utils::createTempDirWithFile;
+using org::apache::nifi::minifi::utils::getFileContent;
+
+class ExecutePythonProcessorTestBase {
+ public:
+  ExecutePythonProcessorTestBase() :
+    logTestController_(LogTestController::getInstance()),
+    logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger()) {
+    reInitialize();
+  }
+  virtual ~ExecutePythonProcessorTestBase() {
+    logTestController_.reset();
+  }
+
+ protected:
+  void reInitialize() {
+    testController_.reset(new TestController());
+    plan_ = testController_->createPlan();
+    logTestController_.setDebug<TestPlan>();
+    logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+    logTestController_.setDebug<minifi::processors::PutFile>();
+    logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+  }
+
+  std::string getScriptFullPath(const std::string& script_file_name) {
+    return org::apache::nifi::minifi::utils::file::FileUtils::concat_path(SCRIPT_FILES_DIRECTORY, script_file_name);
+  }
+
+  static const std::string TEST_FILE_NAME;
+  static const std::string TEST_FILE_CONTENT;
+  static const std::string SCRIPT_FILES_DIRECTORY;
+
+  std::unique_ptr<TestController> testController_;
+  std::shared_ptr<TestPlan> plan_;
+  LogTestController& logTestController_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_NAME{ "test_file.txt" };
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_CONTENT{ "Test text\n" };
+const std::string ExecutePythonProcessorTestBase::SCRIPT_FILES_DIRECTORY{ "test_scripts" };
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const core::Relationship& execute_python_out_conn, const std::string& used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(testController_.get(), TEST_FILE_NAME, TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir(testController_.get());
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) };
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir(testController_.get());
+
+    auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor->initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = org::apache::nifi::minifi::utils::file::FileUtils::concat_path(output_dir, state_name);
+      const std::string output_file_content{ getFileContent(output_file_path) };
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", "getfileCreate2");
+    plan_->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(), getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor->initialize();
+    return executePythonProcessor;
+  }
+
+  std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const core::Relationship& execute_python_outbound_connection, const std::string& dir_path) {
+    std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", "putfile", execute_python_outbound_connection, true);
+    plan_->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path);
+    return putfile;
+  }
+};
+
+// Test for python processors for simple passthrough cases
+//
+// testSimpleFilePassthrough(OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py");
+//
+// translates to
+//
+// GIVEN an ExecutePythonProcessor set up with a "Script Body" attribute that transfers to REL_SUCCESS, but not "Script File"
+// WHEN the processor is triggered
+// THEN any consumer using the success relationship as source should receive the transfered data
+//
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[executePythonProcessorSimple]") {
+  // Expectations
+  const auto OUTPUT_FILE_MATCHES_INPUT          = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
+  const auto RUNTIME_RELATIONSHIP_EXCEPTION    = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
+  const auto PROCESSOR_INITIALIZATION_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
+  // ExecutePython outbound relationships
+  const core::Relationship SUCCESS {"success", "description"};
+  const core::Relationship FAILURE{"failure", "description"};
+
+  // For the tests "" is treated as none-provided since no optional implementation was ported to the project yet
+
+  // 0. Neither valid script file nor script body provided
+  //                                          TEST EXPECTATION  OUT_REL        USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS,                       "", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE,                       "", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "non_existent_script.py", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "non_existent_script.py", "");  // NOLINT
+
+  // 1. Using script file as attribute
+  //                                      TEST EXPECTATION  OUT_REL                                 USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "passthrough_processor_transfering_to_success.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "passthrough_processor_transfering_to_success.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_failure.py", "");  // NOLINT
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, "passthrough_processor_transfering_to_failure.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE,                   "non_transferring_processor.py", "");  // NOLINT
+
+  // 2. Using script body as attribute
+  //                                      TEST EXPECTATION  OUT_REL  SCRIPT_FILE                        USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT 
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "",                   "non_transferring_processor.py");  // NOLINT
+
+  // 3. Setting both attributes
+  //                                          TEST EXPECTATION  OUT_REL                                        SCRIPT_FILE                                 USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py");  // NOLINT
+}
+
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Stateful execution", "[executePythonProcessorStateful]") {
+  testsStatefulProcessor();
+}
+
+}  // namespace
diff --git a/libminifi/test/script-tests/LuaExecuteScriptTests.cpp b/libminifi/test/script-tests/TestExecuteScriptProcessorWithLuaScript.cpp
similarity index 100%
rename from libminifi/test/script-tests/LuaExecuteScriptTests.cpp
rename to libminifi/test/script-tests/TestExecuteScriptProcessorWithLuaScript.cpp
diff --git a/libminifi/test/script-tests/PythonExecuteScriptTests.cpp b/libminifi/test/script-tests/TestExecuteScriptProcessorWithPythonScript.cpp
similarity index 100%
rename from libminifi/test/script-tests/PythonExecuteScriptTests.cpp
rename to libminifi/test/script-tests/TestExecuteScriptProcessorWithPythonScript.cpp
diff --git a/libminifi/test/script-tests/test_scripts/non_transferring_processor.py b/libminifi/test/script-tests/test_scripts/non_transferring_processor.py
new file mode 100644
index 0000000..6f7950e
--- /dev/null
+++ b/libminifi/test/script-tests/test_scripts/non_transferring_processor.py
@@ -0,0 +1,27 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+def describe(processor):
+  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
+def onTrigger(context, session):
+  flow_file = session.get()
+  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+
+  if flow_file is not None:
+    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
diff --git a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py
new file mode 100644
index 0000000..c8cd1c6
--- /dev/null
+++ b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py
@@ -0,0 +1,28 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+def describe(processor):
+  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
+def onTrigger(context, session):
+  flow_file = session.get()
+  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+
+  if flow_file is not None:
+    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+    session.transfer(flow_file, REL_FAILURE)
diff --git a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py
new file mode 100644
index 0000000..7bdb41f
--- /dev/null
+++ b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py
@@ -0,0 +1,28 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+def describe(processor):
+  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
+def onTrigger(context, session):
+  flow_file = session.get()
+  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+
+  if flow_file is not None:
+    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+    session.transfer(flow_file, REL_SUCCESS)
diff --git a/libminifi/test/script-tests/test_scripts/stateful_processor.py b/libminifi/test/script-tests/test_scripts/stateful_processor.py
new file mode 100644
index 0000000..837f15a
--- /dev/null
+++ b/libminifi/test/script-tests/test_scripts/stateful_processor.py
@@ -0,0 +1,42 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+def describe(processor):
+  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
+
+state = 0
+class WriteCallback(object):
+  def process(self, output_stream):
+    global state
+    new_content = str(state).encode('utf-8')
+    output_stream.write(new_content)
+    state = state + 1
+    return len(new_content)
+
+def onTrigger(context, session):
+  global state
+  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+  # flow_file = session.get()
+  flow_file = session.create()
+  flow_file.updateAttribute("filename", str(state))
+  log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+
+  if flow_file is not None:
+    session.write(flow_file, WriteCallback())
+    session.transfer(flow_file, REL_SUCCESS)