You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/30 11:30:11 UTC
nifi-minifi-cpp git commit: MINIFICPP-654 - C API: failure callback
improvements
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 0043155dc -> 2fb4021e1
MINIFICPP-654 - C API: failure callback improvements
This closes #429.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2fb4021e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2fb4021e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2fb4021e
Branch: refs/heads/master
Commit: 2fb4021e1b93eaa7ae491a13f88a17e466475a86
Parents: 0043155
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Wed Oct 24 15:14:05 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Oct 30 07:29:30 2018 -0400
----------------------------------------------------------------------
libminifi/include/capi/Plan.h | 77 ++++++++++++++++++++++------------
libminifi/include/capi/api.h | 13 +++++-
libminifi/include/capi/cstructs.h | 6 ++-
libminifi/src/capi/Plan.cpp | 31 ++++++++++++--
libminifi/src/capi/api.cpp | 49 +++++++++++-----------
libminifi/test/capi/CAPITests.cpp | 12 ++++--
6 files changed, 129 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index 08ad68a..75330a0 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -46,6 +46,41 @@
#include "capi/cstructs.h"
#include "capi/api.h"
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+ void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ auto ff = session->get();
+ if (ff == nullptr) {
+ return;
+ }
+
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr && user_callback != nullptr) {
+ claim->increaseFlowFileRecordOwnedCount();
+ // create a flow file.
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->attributes = ff->getAttributesPtr();
+ ffr->ffp = ff.get();
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = cr_ptr;
+ user_callback(ffr);
+ }
+ session->remove(ff);
+ }
+
+ void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ session->rollback();
+ failureStrategyAsIs(session, user_callback, cr_ptr);
+ }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+ { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
class ExecutionPlan {
public:
@@ -67,7 +102,9 @@ class ExecutionPlan {
bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
- bool setFailureCallback(void (*onerror_callback)(const flow_file_record*));
+ bool setFailureCallback(failure_callback_type onerror_callback);
+
+ bool setFailureStrategy(FailureStrategy start);
std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
@@ -100,37 +137,25 @@ class ExecutionPlan {
protected:
class FailureHandler {
public:
- FailureHandler() {
+ FailureHandler(content_repo_sptr cr_ptr) {
callback_ = nullptr;
+ strategy_ = FailureStrategy::AS_IS;
+ content_repo_ = cr_ptr;
}
- void setCallback(void (*onerror_callback)(const flow_file_record*)) {
+ void setCallback(failure_callback_type onerror_callback) {
callback_=onerror_callback;
}
- void operator()(const processor_session* ps)
- {
+ void setStrategy(FailureStrategy strat) {
+ strategy_ = strat;
+ }
+ void operator()(const processor_session* ps) {
auto ses = static_cast<core::ProcessSession*>(ps->session);
-
- auto ff = ses->get();
- if (ff == nullptr) {
- return;
- }
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr && callback_ != nullptr) {
- // create a flow file.
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->attributes = ff->getAttributesPtr();
- ffr->ffp = ff.get();
- callback_(ffr);
- }
- // This deletes the content of the flowfile as ff gets out of scope
- // It's the users responsibility to copy all the data
- ses->remove(ff);
-
+ FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
}
private:
- void (*callback_)(const flow_file_record*);
+ failure_callback_type callback_;
+ FailureStrategy strategy_;
+ content_repo_sptr content_repo_;
};
void finalize();
@@ -142,7 +167,7 @@ class ExecutionPlan {
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
- std::shared_ptr<core::ContentRepository> content_repo_;
+ content_repo_sptr content_repo_;
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::Repository> prov_repo_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 4e319ef..e05c04e 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -83,10 +83,19 @@ processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_ses
/**
* Register your callback to received flow files that the flow failed to process
-* The flow file is deleted after the callback is executed, make sure to copy all the data you need!
+* The flow file ownership is transferred to the caller!
* The first callback should be registered before the flow is used. Can be changed later during runtime.
*/
-int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*));
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
int set_property(processor *, const char *, const char *);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/cstructs.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h
index 731fd55..55197d9 100644
--- a/libminifi/include/capi/cstructs.h
+++ b/libminifi/include/capi/cstructs.h
@@ -19,6 +19,8 @@
#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+#include <stddef.h>
+#include <stdint.h>
/**
* NiFi Port struct
@@ -105,6 +107,8 @@ typedef struct {
void * in;
+ void * crp;
+
char * contentLocation; /**< Filesystem location of this object */
void *attributes; /**< Hash map of attributes */
@@ -118,6 +122,6 @@ typedef struct {
void *plan;
} flow;
-
+typedef enum FS { AS_IS, ROLLBACK } FailureStrategy;
#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 0abd63b..78b864c 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -23,6 +23,18 @@
#include <set>
#include <string>
+bool intToFailureStragey(int in, FailureStrategy *out) {
+ auto tmp = static_cast<FailureStrategy>(in);
+ switch (tmp) {
+ case AS_IS:
+ case ROLLBACK:
+ *out = tmp;
+ return true;
+ default:
+ return false;
+ }
+}
+
std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
@@ -192,7 +204,12 @@ void ExecutionPlan::finalize() {
callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
for (const auto& proc : processor_queue_) {
- relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+ for (const auto& rel : proc->getSupportedRelationships()) {
+ if (rel.getName() == "failure") {
+ relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+ break;
+ }
+ }
}
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
@@ -256,14 +273,22 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared
return connection;
}
-bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) {
+bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
if (finalized && !failure_handler_) {
return false; // Already finalized the flow without failure handler processor
}
if (!failure_handler_) {
- failure_handler_ = std::make_shared<FailureHandler>();
+ failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
}
failure_handler_->setCallback(onerror_callback);
return true;
}
+bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
+ if (!failure_handler_) {
+ return false;
+ }
+ failure_handler_->setStrategy(start);
+ return true;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 21010a1..58328ef 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -159,14 +159,9 @@ flow_file_record* create_ff_object(const char *file, const size_t len, const uin
if (nullptr == file) {
return nullptr;
}
- flow_file_record *new_ff = new flow_file_record;
+ flow_file_record *new_ff = create_ff_object_na(file, len, size);
new_ff->attributes = new string_map();
new_ff->ffp = 0;
- new_ff->contentLocation = new char[len + 1];
- snprintf(new_ff->contentLocation, len + 1, "%s", file);
- std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
- // set the size of the flow file.
- new_ff->size = size;
return new_ff;
}
@@ -175,9 +170,9 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
new_ff->attributes = nullptr;
new_ff->contentLocation = new char[len + 1];
snprintf(new_ff->contentLocation, len + 1, "%s", file);
- std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
// set the size of the flow file.
new_ff->size = size;
+ new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
return new_ff;
}
/**
@@ -185,21 +180,21 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
* @param ff flow file record.
*/
void free_flowfile(flow_file_record *ff) {
- if (ff != nullptr) {
- if (ff->in != nullptr) {
- auto instance = static_cast<nifi_instance*>(ff->in);
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- auto content_repo = minifi_instance_ref->getContentRepository();
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
- content_repo->remove(claim);
- }
- if (ff->ffp == nullptr) {
- auto map = static_cast<string_map*>(ff->attributes);
- delete map;
- }
- delete[] ff->contentLocation;
- delete ff;
+ if (ff == nullptr) {
+ return;
+ }
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+ if (content_repo_ptr->get()) {
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+ (*content_repo_ptr)->remove(claim);
}
+ if (ff->ffp == nullptr) {
+ auto map = static_cast<string_map*>(ff->attributes);
+ delete map;
+ }
+ delete[] ff->contentLocation;
+ delete ff;
+ delete content_repo_ptr;
}
/**
@@ -405,11 +400,15 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
return nullptr;
}
-int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) {
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
return plan->setFailureCallback(onerror_callback) ? 0 : 1;
}
+int set_failure_strategy(flow *flow, FailureStrategy strategy) {
+ return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
+}
+
int set_property(processor *proc, const char *name, const char *value) {
if (name != nullptr && value != nullptr && proc != nullptr) {
core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
@@ -447,7 +446,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
ffr->ffp = ff.get();
ffr->attributes = ff->getAttributesPtr();
- ffr->in = instance;
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = execution_plan->getContentRepo();
return ffr;
} else {
return nullptr;
@@ -489,7 +489,8 @@ flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *se
auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
ffr->attributes = ff->getAttributesPtr();
ffr->ffp = ff.get();
- ffr->in = instance;
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = execution_plan->getContentRepo();
return ffr;
} else {
return nullptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
index d4e1c49..368c9a1 100644
--- a/libminifi/test/capi/CAPITests.cpp
+++ b/libminifi/test/capi/CAPITests.cpp
@@ -34,19 +34,22 @@
static nifi_instance *create_instance_obj(const char *name = "random_instance") {
nifi_port port;
- port.port_id = "12345";
+ char port_str[] = "12345";
+ port.port_id = port_str;
return create_instance("random_instance", &port);
}
static int failure_count = 0;
-void failure_counter(const flow_file_record * fr) {
+void failure_counter(flow_file_record * fr) {
failure_count++;
REQUIRE(get_attribute_qty(fr) > 0);
+ free_flowfile(fr);
}
-void big_failure_counter(const flow_file_record * fr) {
+void big_failure_counter(flow_file_record * fr) {
failure_count += 100;
+ free_flowfile(fr);
}
TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
@@ -226,6 +229,8 @@ TEST_CASE("Test error handling callback", "[errorHandling]") {
flow *test_flow = create_flow(instance, nullptr);
REQUIRE(test_flow != nullptr);
+ // Failure strategy cannot be set before a valid callback is added
+ REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
processor *get_proc = add_processor(test_flow, "GetFile");
@@ -251,6 +256,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") {
// Failure handler function can be replaced runtime
REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+ REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
// Create new testfile to trigger failure again
ss << "2";