You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/24 12:01:25 UTC

nifi-minifi-cpp git commit: MINIFICPP-641 - C API: add support to register failure callback

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 4daa833fd -> 1a50a4ab0


MINIFICPP-641 - C API: add support to register failure callback

This closes #421.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a50a4ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a50a4ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a50a4ab

Branch: refs/heads/master
Commit: 1a50a4ab03d522352a953f2e7ce9cb872ccf0738
Parents: 4daa833
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Tue Oct 9 17:56:38 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Wed Oct 24 08:01:09 2018 -0400

----------------------------------------------------------------------
 libminifi/include/capi/Plan.h                   |  50 ++++++++-
 libminifi/include/capi/api.h                    |   7 ++
 .../include/processors/CallbackProcessor.h      |   4 +-
 libminifi/src/capi/Plan.cpp                     | 109 +++++++++++--------
 libminifi/src/capi/api.cpp                      |   8 +-
 libminifi/src/core/ProcessSession.cpp           |   2 +-
 libminifi/test/CPPLINT.cfg                      |   2 +-
 libminifi/test/capi/CAPITests.cpp               |  69 +++++++++++-
 8 files changed, 194 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index 4afcd18..08ad68a 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -44,13 +44,15 @@
 #include "core/ProcessorNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "capi/cstructs.h"
+#include "capi/api.h"
+
 
 class ExecutionPlan {
  public:
 
   explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
 
-  std::shared_ptr<core::Processor> addCallback(void *, void (*fp)(processor_session *));
+  std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
 
   std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
                                                 core::Relationship relationship = core::Relationship("success", "description"),
@@ -65,6 +67,8 @@ class ExecutionPlan {
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
 
+  bool setFailureCallback(void (*onerror_callback)(const flow_file_record*));
+
   std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
 
   std::shared_ptr<core::FlowFile> getCurrentFlowFile();
@@ -83,8 +87,6 @@ class ExecutionPlan {
     return content_repo_;
   }
 
-  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
-
   std::shared_ptr<core::FlowFile> getNextFlowFile(){
     return next_ff_;
   }
@@ -93,11 +95,50 @@ class ExecutionPlan {
     next_ff_ = ptr;
   }
 
+  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
  protected:
+  class FailureHandler {
+   public:
+    FailureHandler() {
+      callback_ = nullptr;
+    }
+    void setCallback(void (*onerror_callback)(const flow_file_record*)) {
+      callback_=onerror_callback;
+    }
+    void operator()(const processor_session* ps)
+    {
+      auto ses = static_cast<core::ProcessSession*>(ps->session);
+
+      auto ff = ses->get();
+      if (ff == nullptr) {
+        return;
+      }
+      auto claim = ff->getResourceClaim();
+
+      if (claim != nullptr && callback_ != nullptr) {
+        // create a flow file.
+        auto path = claim->getContentFullPath();
+        auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+        ffr->attributes = ff->getAttributesPtr();
+        ffr->ffp = ff.get();
+        callback_(ffr);
+      }
+      // This deletes the content of the flowfile as ff gets out of scope
+      // It's the users responsibility to copy all the data
+      ses->remove(ff);
+
+    }
+   private:
+    void (*callback_)(const flow_file_record*);
+  };
 
   void finalize();
 
-  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+  std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+                                                        core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
 
   std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
 
@@ -131,6 +172,7 @@ class ExecutionPlan {
 
   static std::shared_ptr<utils::IdGenerator> id_generator_;
   std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<FailureHandler> failure_handler_;
 };
 
 #endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 94b05fa..2b3622d 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -79,6 +79,13 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name);
 
 processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
 
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file is deleted after the callback is executed, make sure to copy all the data you need!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*));
+
 int set_property(processor *, const char *, const char *);
 
 int set_instance_property(nifi_instance *instance, const char*, const char *);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/processors/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h
index 879f49c..801c99c 100644
--- a/libminifi/include/processors/CallbackProcessor.h
+++ b/libminifi/include/processors/CallbackProcessor.h
@@ -65,7 +65,7 @@ class CallbackProcessor : public core::Processor {
 
  public:
 
-  void setCallback(void *obj, void (*ontrigger_callback)(processor_session *)) {
+  void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
     objref_ = obj;
     callback_ = ontrigger_callback;
   }
@@ -82,7 +82,7 @@ class CallbackProcessor : public core::Processor {
 
  protected:
   void *objref_;
-  void (*callback_)(processor_session*);
+  std::function<void(processor_session*)> callback_;
  private:
   // Logger
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 691be41..0abd63b 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -40,21 +40,17 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re
  * Add a callback to obtain and pass processor session to a generated processor
  *
  */
-std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, void (*fp)(processor_session *)) {
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
   if (finalized) {
     return nullptr;
   }
 
-  utils::Identifier uuid;
-  id_generator_->generate(uuid);
+  auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
+  if (!ptr)
+    return nullptr;
 
-  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate("CallbackProcessor", uuid);
-  if (nullptr == ptr) {
-    throw std::exception();
-  }
   std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
   processor->setCallback(obj, fp);
-  processor->setName("CallbackProcessor");
 
   return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
 }
@@ -99,25 +95,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
       termination_ = relationship;
     }
 
-    std::stringstream connection_name;
-    connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
-    connection->setRelationship(relationship);
-
-    // link the connections so that we can test results at the end for this
-    connection->setSource(last);
-    connection->setDestination(processor);
-
-    utils::Identifier uuid_copy, uuid_copy_next;
-    last->getUUID(uuid_copy);
-    connection->setSourceUUID(uuid_copy);
-    processor->getUUID(uuid_copy_next);
-    connection->setDestinationUUID(uuid_copy_next);
-    last->addConnection(connection);
-    if (last != processor) {
-      processor->addConnection(connection);
-    }
-    relationships_.push_back(connection);
+    relationships_.push_back(connectProcessors(last, processor, relationship, true));
   }
 
   std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
@@ -202,29 +180,31 @@ std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
   return current_session_;
 }
 
-std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
-  std::stringstream connection_name;
-  std::shared_ptr<core::Processor> last = processor;
-  connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
-  connection->setRelationship(termination_);
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
+  return connectProcessors(processor, processor, termination_, set_dst);
+}
 
-  // link the connections so that we can test results at the end for this
-  connection->setSource(last);
-  if (setDest)
-    connection->setDestination(processor);
+void ExecutionPlan::finalize() {
+  if (failure_handler_) {
+    auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
 
-  utils::Identifier uuid_copy;
-  last->getUUID(uuid_copy);
-  connection->setSourceUUID(uuid_copy);
-  if (setDest)
-    connection->setDestinationUUID(uuid_copy);
+    std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
+    callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
 
-  processor->addConnection(connection);
-  return connection;
-}
+    for (const auto& proc : processor_queue_) {
+      relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+    }
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
+
+    processor_nodes_.push_back(node);
+
+    std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+    processor_contexts_.push_back(context);
+
+    processor_queue_.push_back(failure_proc);
+  }
 
-void ExecutionPlan::finalize() {
   if (relationships_.size() > 0) {
     relationships_.push_back(buildFinalConnection(processor_queue_.back()));
   } else {
@@ -250,3 +230,40 @@ std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::strin
   return processor;
 }
 
+std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+                                                      core::Relationship relationship, bool set_dst) {
+  std::stringstream connection_name;
+  connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+  connection->setRelationship(relationship);
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(src_proc);
+
+  utils::Identifier uuid_copy, uuid_copy_next;
+  src_proc->getUUID(uuid_copy);
+  connection->setSourceUUID(uuid_copy);
+  if (set_dst) {
+    connection->setDestination(dst_proc);
+    dst_proc->getUUID(uuid_copy_next);
+    connection->setDestinationUUID(uuid_copy_next);
+    if (src_proc != dst_proc) {
+      dst_proc->addConnection(connection);
+    }
+  }
+  src_proc->addConnection(connection);
+
+  return connection;
+}
+
+bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) {
+  if (finalized && !failure_handler_) {
+    return false;  // Already finalized the flow without failure handler processor
+  }
+  if (!failure_handler_) {
+    failure_handler_ = std::make_shared<FailureHandler>();
+  }
+  failure_handler_->setCallback(onerror_callback);
+  return true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 6bec57c..66ed19b 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -32,6 +32,7 @@
 using string_map = std::map<std::string, std::string>;
 
 class API_INITIALIZER {
+ public:
   static int initialized;
 };
 
@@ -353,7 +354,7 @@ processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor
     return nullptr;
   }
   ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addCallback(nullptr, ontrigger_callback);
+  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
   processor *new_processor = new processor();
   new_processor->processor_ptr = proc.get();
   return new_processor;
@@ -399,6 +400,11 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
   return nullptr;
 }
 
+int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+}
+
 int set_property(processor *proc, const char *name, const char *value) {
   if (name != nullptr && value != nullptr && proc != nullptr) {
     core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dc45446..6981bce 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -703,7 +703,7 @@ void ProcessSession::commit() {
           // No connection
           if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
             // Not autoterminate, we should have the connect
-            std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
+            std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
           } else {
             // Autoterminated

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index a1e22ad..7df4c76 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -1,4 +1,4 @@
 set noparent
 filter=-build/include_order,-build/include_alpha
 exclude_files=Server.cpp
-exclude_files=TestBase.cpp
\ No newline at end of file
+exclude_files=TestBase.cpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
index 1739e1b..08214ac 100644
--- a/libminifi/test/capi/CAPITests.cpp
+++ b/libminifi/test/capi/CAPITests.cpp
@@ -38,6 +38,17 @@ static nifi_instance *create_instance_obj(const char *name = "random_instance")
   return create_instance("random_instance", &port);
 }
 
+static int failure_count = 0;
+
+void failure_counter(const flow_file_record * fr) {
+  failure_count++;
+  REQUIRE(get_attribute_qty(fr) > 0);
+}
+
+void big_failure_counter(const flow_file_record * fr) {
+  failure_count += 100;
+}
+
 TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
   auto instance = create_instance_obj();
   REQUIRE(instance != nullptr);
@@ -116,6 +127,9 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
 
   REQUIRE(test_file_content == put_data);
 
+  // No failure handler can be added after the flow is finalized
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
+
   free_flowfile(record);
 
   free_flow(test_flow);
@@ -126,8 +140,6 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
 TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
   TestController testController;
 
-  enable_logging();
-
   char src_format[] = "/tmp/gt.XXXXXX";
   const char *sourcedir = testController.createTempDirectory(src_format);
 
@@ -201,3 +213,56 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
   free_flow(test_flow);
   free_instance(instance);
 }
+
+TEST_CASE("Test error handling callback", "[errorHandling]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
+  REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+
+  REQUIRE(failure_count == 1);
+
+  // Failure handler function can be replaced runtime
+  REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+
+  // Create new testfile to trigger failure again
+  ss << "2";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+  REQUIRE(failure_count > 100);
+
+  failure_count = 0;
+
+  free_flow(test_flow);
+  free_instance(instance);
+}