You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/30 11:30:11 UTC

nifi-minifi-cpp git commit: MINIFICPP-654 - C API: failure callback improvements

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 0043155dc -> 2fb4021e1


MINIFICPP-654 - C API: failure callback improvements

This closes #429.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2fb4021e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2fb4021e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2fb4021e

Branch: refs/heads/master
Commit: 2fb4021e1b93eaa7ae491a13f88a17e466475a86
Parents: 0043155
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Wed Oct 24 15:14:05 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Oct 30 07:29:30 2018 -0400

----------------------------------------------------------------------
 libminifi/include/capi/Plan.h     | 77 ++++++++++++++++++++++------------
 libminifi/include/capi/api.h      | 13 +++++-
 libminifi/include/capi/cstructs.h |  6 ++-
 libminifi/src/capi/Plan.cpp       | 31 ++++++++++++--
 libminifi/src/capi/api.cpp        | 49 +++++++++++-----------
 libminifi/test/capi/CAPITests.cpp | 12 ++++--
 6 files changed, 129 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index 08ad68a..75330a0 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -46,6 +46,41 @@
 #include "capi/cstructs.h"
 #include "capi/api.h"
 
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+  void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    auto ff = session->get();
+    if (ff == nullptr) {
+      return;
+    }
+
+    auto claim = ff->getResourceClaim();
+
+    if (claim != nullptr && user_callback != nullptr) {
+      claim->increaseFlowFileRecordOwnedCount();
+      // create a flow file.
+      auto path = claim->getContentFullPath();
+      auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+      ffr->attributes = ff->getAttributesPtr();
+      ffr->ffp = ff.get();
+      auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+      *content_repo_ptr = cr_ptr;
+      user_callback(ffr);
+    }
+    session->remove(ff);
+  }
+
+  void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    session->rollback();
+    failureStrategyAsIs(session, user_callback, cr_ptr);
+  }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+    { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
 
 class ExecutionPlan {
  public:
@@ -67,7 +102,9 @@ class ExecutionPlan {
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
 
-  bool setFailureCallback(void (*onerror_callback)(const flow_file_record*));
+  bool setFailureCallback(failure_callback_type onerror_callback);
+
+  bool setFailureStrategy(FailureStrategy start);
 
   std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
 
@@ -100,37 +137,25 @@ class ExecutionPlan {
  protected:
   class FailureHandler {
    public:
-    FailureHandler() {
+    FailureHandler(content_repo_sptr cr_ptr) {
       callback_ = nullptr;
+      strategy_ = FailureStrategy::AS_IS;
+      content_repo_ = cr_ptr;
     }
-    void setCallback(void (*onerror_callback)(const flow_file_record*)) {
+    void setCallback(failure_callback_type onerror_callback) {
       callback_=onerror_callback;
     }
-    void operator()(const processor_session* ps)
-    {
+    void setStrategy(FailureStrategy strat) {
+      strategy_ = strat;
+    }
+    void operator()(const processor_session* ps) {
       auto ses = static_cast<core::ProcessSession*>(ps->session);
-
-      auto ff = ses->get();
-      if (ff == nullptr) {
-        return;
-      }
-      auto claim = ff->getResourceClaim();
-
-      if (claim != nullptr && callback_ != nullptr) {
-        // create a flow file.
-        auto path = claim->getContentFullPath();
-        auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-        ffr->attributes = ff->getAttributesPtr();
-        ffr->ffp = ff.get();
-        callback_(ffr);
-      }
-      // This deletes the content of the flowfile as ff gets out of scope
-      // It's the users responsibility to copy all the data
-      ses->remove(ff);
-
+      FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
     }
    private:
-    void (*callback_)(const flow_file_record*);
+    failure_callback_type callback_;
+    FailureStrategy strategy_;
+    content_repo_sptr content_repo_;
   };
 
   void finalize();
@@ -142,7 +167,7 @@ class ExecutionPlan {
 
   std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
 
-  std::shared_ptr<core::ContentRepository> content_repo_;
+  content_repo_sptr content_repo_;
 
   std::shared_ptr<core::Repository> flow_repo_;
   std::shared_ptr<core::Repository> prov_repo_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 4e319ef..e05c04e 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -83,10 +83,19 @@ processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_ses
 
 /**
 * Register your callback to received flow files that the flow failed to process
-* The flow file is deleted after the callback is executed, make sure to copy all the data you need!
+* The flow file ownership is transferred to the caller!
 * The first callback should be registered before the flow is used. Can be changed later during runtime.
 */
-int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*));
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
 
 int set_property(processor *, const char *, const char *);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/cstructs.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h
index 731fd55..55197d9 100644
--- a/libminifi/include/capi/cstructs.h
+++ b/libminifi/include/capi/cstructs.h
@@ -19,6 +19,8 @@
 #ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
 #define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
 
+#include <stddef.h>
+#include <stdint.h>
 
 /**
  * NiFi Port struct
@@ -105,6 +107,8 @@ typedef struct {
 
   void * in;
 
+  void * crp;
+
   char * contentLocation; /**< Filesystem location of this object */
 
   void *attributes; /**< Hash map of attributes */
@@ -118,6 +122,6 @@ typedef struct {
   void *plan;
 } flow;
 
-
+typedef  enum FS { AS_IS, ROLLBACK } FailureStrategy;
 
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 0abd63b..78b864c 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -23,6 +23,18 @@
 #include <set>
 #include <string>
 
+bool intToFailureStragey(int in, FailureStrategy *out) {
+  auto tmp = static_cast<FailureStrategy>(in);
+  switch (tmp) {
+    case AS_IS:
+    case ROLLBACK:
+      *out = tmp;
+      return true;
+    default:
+      return false;
+  }
+}
+
 std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
 
 ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
@@ -192,7 +204,12 @@ void ExecutionPlan::finalize() {
     callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
 
     for (const auto& proc : processor_queue_) {
-      relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+      for (const auto& rel : proc->getSupportedRelationships()) {
+        if (rel.getName() == "failure") {
+          relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+          break;
+        }
+      }
     }
 
     std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
@@ -256,14 +273,22 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared
   return connection;
 }
 
-bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) {
+bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
   if (finalized && !failure_handler_) {
     return false;  // Already finalized the flow without failure handler processor
   }
   if (!failure_handler_) {
-    failure_handler_ = std::make_shared<FailureHandler>();
+    failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
   }
   failure_handler_->setCallback(onerror_callback);
   return true;
 }
 
+bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
+  if (!failure_handler_) {
+    return false;
+  }
+  failure_handler_->setStrategy(start);
+  return true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 21010a1..58328ef 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -159,14 +159,9 @@ flow_file_record* create_ff_object(const char *file, const size_t len, const uin
   if (nullptr == file) {
     return nullptr;
   }
-  flow_file_record *new_ff = new flow_file_record;
+  flow_file_record *new_ff = create_ff_object_na(file, len, size);
   new_ff->attributes = new string_map();
   new_ff->ffp = 0;
-  new_ff->contentLocation = new char[len + 1];
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
-  // set the size of the flow file.
-  new_ff->size = size;
   return new_ff;
 }
 
@@ -175,9 +170,9 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
   new_ff->attributes = nullptr;
   new_ff->contentLocation = new char[len + 1];
   snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
   // set the size of the flow file.
   new_ff->size = size;
+  new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
   return new_ff;
 }
 /**
@@ -185,21 +180,21 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
  * @param ff flow file record.
  */
 void free_flowfile(flow_file_record *ff) {
-  if (ff != nullptr) {
-    if (ff->in != nullptr) {
-      auto instance = static_cast<nifi_instance*>(ff->in);
-      auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-      auto content_repo = minifi_instance_ref->getContentRepository();
-      std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
-      content_repo->remove(claim);
-    }
-    if (ff->ffp == nullptr) {
-      auto map = static_cast<string_map*>(ff->attributes);
-      delete map;
-    }
-    delete[] ff->contentLocation;
-    delete ff;
+  if (ff == nullptr) {
+    return;
+  }
+  auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+  if (content_repo_ptr->get()) {
+    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+    (*content_repo_ptr)->remove(claim);
   }
+  if (ff->ffp == nullptr) {
+    auto map = static_cast<string_map*>(ff->attributes);
+    delete map;
+  }
+  delete[] ff->contentLocation;
+  delete ff;
+  delete content_repo_ptr;
 }
 
 /**
@@ -405,11 +400,15 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
   return nullptr;
 }
 
-int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) {
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
   ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
   return plan->setFailureCallback(onerror_callback) ? 0 : 1;
 }
 
+int set_failure_strategy(flow *flow, FailureStrategy strategy) {
+  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
+}
+
 int set_property(processor *proc, const char *name, const char *value) {
   if (name != nullptr && value != nullptr && proc != nullptr) {
     core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
@@ -447,7 +446,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
     auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
     ffr->ffp = ff.get();
     ffr->attributes = ff->getAttributesPtr();
-    ffr->in = instance;
+    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
     return ffr;
   } else {
     return nullptr;
@@ -489,7 +489,8 @@ flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *se
     auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
     ffr->attributes = ff->getAttributesPtr();
     ffr->ffp = ff.get();
-    ffr->in = instance;
+    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
     return ffr;
   } else {
     return nullptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
index d4e1c49..368c9a1 100644
--- a/libminifi/test/capi/CAPITests.cpp
+++ b/libminifi/test/capi/CAPITests.cpp
@@ -34,19 +34,22 @@
 
 static nifi_instance *create_instance_obj(const char *name = "random_instance") {
   nifi_port port;
-  port.port_id = "12345";
+  char port_str[] = "12345";
+  port.port_id = port_str;
   return create_instance("random_instance", &port);
 }
 
 static int failure_count = 0;
 
-void failure_counter(const flow_file_record * fr) {
+void failure_counter(flow_file_record * fr) {
   failure_count++;
   REQUIRE(get_attribute_qty(fr) > 0);
+  free_flowfile(fr);
 }
 
-void big_failure_counter(const flow_file_record * fr) {
+void big_failure_counter(flow_file_record * fr) {
   failure_count += 100;
+  free_flowfile(fr);
 }
 
 TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
@@ -226,6 +229,8 @@ TEST_CASE("Test error handling callback", "[errorHandling]") {
   flow *test_flow = create_flow(instance, nullptr);
   REQUIRE(test_flow != nullptr);
 
+  // Failure strategy cannot be set before a valid callback is added
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
   REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
 
   processor *get_proc = add_processor(test_flow, "GetFile");
@@ -251,6 +256,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") {
 
   // Failure handler function can be replaced runtime
   REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
 
   // Create new testfile to trigger failure again
   ss << "2";