You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/24 12:01:25 UTC
nifi-minifi-cpp git commit: MINIFICPP-641 - C API: add support to
register failure callback
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 4daa833fd -> 1a50a4ab0
MINIFICPP-641 - C API: add support to register failure callback
This closes #421.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a50a4ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a50a4ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a50a4ab
Branch: refs/heads/master
Commit: 1a50a4ab03d522352a953f2e7ce9cb872ccf0738
Parents: 4daa833
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Tue Oct 9 17:56:38 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Wed Oct 24 08:01:09 2018 -0400
----------------------------------------------------------------------
libminifi/include/capi/Plan.h | 50 ++++++++-
libminifi/include/capi/api.h | 7 ++
.../include/processors/CallbackProcessor.h | 4 +-
libminifi/src/capi/Plan.cpp | 109 +++++++++++--------
libminifi/src/capi/api.cpp | 8 +-
libminifi/src/core/ProcessSession.cpp | 2 +-
libminifi/test/CPPLINT.cfg | 2 +-
libminifi/test/capi/CAPITests.cpp | 69 +++++++++++-
8 files changed, 194 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index 4afcd18..08ad68a 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -44,13 +44,15 @@
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "capi/cstructs.h"
+#include "capi/api.h"
+
class ExecutionPlan {
public:
explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
- std::shared_ptr<core::Processor> addCallback(void *, void (*fp)(processor_session *));
+ std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
core::Relationship relationship = core::Relationship("success", "description"),
@@ -65,6 +67,8 @@ class ExecutionPlan {
bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ bool setFailureCallback(void (*onerror_callback)(const flow_file_record*));
+
std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
std::shared_ptr<core::FlowFile> getCurrentFlowFile();
@@ -83,8 +87,6 @@ class ExecutionPlan {
return content_repo_;
}
- static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
-
std::shared_ptr<core::FlowFile> getNextFlowFile(){
return next_ff_;
}
@@ -93,11 +95,50 @@ class ExecutionPlan {
next_ff_ = ptr;
}
+ static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
protected:
+ class FailureHandler {
+ public:
+ FailureHandler() {
+ callback_ = nullptr;
+ }
+ void setCallback(void (*onerror_callback)(const flow_file_record*)) {
+ callback_=onerror_callback;
+ }
+ void operator()(const processor_session* ps)
+ {
+ auto ses = static_cast<core::ProcessSession*>(ps->session);
+
+ auto ff = ses->get();
+ if (ff == nullptr) {
+ return;
+ }
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr && callback_ != nullptr) {
+ // create a flow file.
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->attributes = ff->getAttributesPtr();
+ ffr->ffp = ff.get();
+ callback_(ffr);
+ }
+ // This deletes the content of the flowfile as ff gets out of scope
+ // It's the users responsibility to copy all the data
+ ses->remove(ff);
+
+ }
+ private:
+ void (*callback_)(const flow_file_record*);
+ };
void finalize();
- std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+ std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+ std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+ core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
@@ -131,6 +172,7 @@ class ExecutionPlan {
static std::shared_ptr<utils::IdGenerator> id_generator_;
std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<FailureHandler> failure_handler_;
};
#endif /* LIBMINIFI_CAPI_PLAN_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 94b05fa..2b3622d 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -79,6 +79,13 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name);
processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file is deleted after the callback is executed, make sure to copy all the data you need!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*));
+
int set_property(processor *, const char *, const char *);
int set_instance_property(nifi_instance *instance, const char*, const char *);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/processors/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h
index 879f49c..801c99c 100644
--- a/libminifi/include/processors/CallbackProcessor.h
+++ b/libminifi/include/processors/CallbackProcessor.h
@@ -65,7 +65,7 @@ class CallbackProcessor : public core::Processor {
public:
- void setCallback(void *obj, void (*ontrigger_callback)(processor_session *)) {
+ void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
objref_ = obj;
callback_ = ontrigger_callback;
}
@@ -82,7 +82,7 @@ class CallbackProcessor : public core::Processor {
protected:
void *objref_;
- void (*callback_)(processor_session*);
+ std::function<void(processor_session*)> callback_;
private:
// Logger
std::shared_ptr<logging::Logger> logger_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 691be41..0abd63b 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -40,21 +40,17 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re
* Add a callback to obtain and pass processor session to a generated processor
*
*/
-std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, void (*fp)(processor_session *)) {
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
if (finalized) {
return nullptr;
}
- utils::Identifier uuid;
- id_generator_->generate(uuid);
+ auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
+ if (!ptr)
+ return nullptr;
- auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate("CallbackProcessor", uuid);
- if (nullptr == ptr) {
- throw std::exception();
- }
std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
processor->setCallback(obj, fp);
- processor->setName("CallbackProcessor");
return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
}
@@ -99,25 +95,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
termination_ = relationship;
}
- std::stringstream connection_name;
- connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
- connection->setRelationship(relationship);
-
- // link the connections so that we can test results at the end for this
- connection->setSource(last);
- connection->setDestination(processor);
-
- utils::Identifier uuid_copy, uuid_copy_next;
- last->getUUID(uuid_copy);
- connection->setSourceUUID(uuid_copy);
- processor->getUUID(uuid_copy_next);
- connection->setDestinationUUID(uuid_copy_next);
- last->addConnection(connection);
- if (last != processor) {
- processor->addConnection(connection);
- }
- relationships_.push_back(connection);
+ relationships_.push_back(connectProcessors(last, processor, relationship, true));
}
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
@@ -202,29 +180,31 @@ std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
return current_session_;
}
-std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
- std::stringstream connection_name;
- std::shared_ptr<core::Processor> last = processor;
- connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
- connection->setRelationship(termination_);
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
+ return connectProcessors(processor, processor, termination_, set_dst);
+}
- // link the connections so that we can test results at the end for this
- connection->setSource(last);
- if (setDest)
- connection->setDestination(processor);
+void ExecutionPlan::finalize() {
+ if (failure_handler_) {
+ auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
- utils::Identifier uuid_copy;
- last->getUUID(uuid_copy);
- connection->setSourceUUID(uuid_copy);
- if (setDest)
- connection->setDestinationUUID(uuid_copy);
+ std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
+ callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
- processor->addConnection(connection);
- return connection;
-}
+ for (const auto& proc : processor_queue_) {
+ relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+ }
+
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
+
+ processor_nodes_.push_back(node);
+
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ processor_contexts_.push_back(context);
+
+ processor_queue_.push_back(failure_proc);
+ }
-void ExecutionPlan::finalize() {
if (relationships_.size() > 0) {
relationships_.push_back(buildFinalConnection(processor_queue_.back()));
} else {
@@ -250,3 +230,40 @@ std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::strin
return processor;
}
+std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+ core::Relationship relationship, bool set_dst) {
+ std::stringstream connection_name;
+ connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(relationship);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(src_proc);
+
+ utils::Identifier uuid_copy, uuid_copy_next;
+ src_proc->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ if (set_dst) {
+ connection->setDestination(dst_proc);
+ dst_proc->getUUID(uuid_copy_next);
+ connection->setDestinationUUID(uuid_copy_next);
+ if (src_proc != dst_proc) {
+ dst_proc->addConnection(connection);
+ }
+ }
+ src_proc->addConnection(connection);
+
+ return connection;
+}
+
+bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) {
+ if (finalized && !failure_handler_) {
+ return false; // Already finalized the flow without failure handler processor
+ }
+ if (!failure_handler_) {
+ failure_handler_ = std::make_shared<FailureHandler>();
+ }
+ failure_handler_->setCallback(onerror_callback);
+ return true;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 6bec57c..66ed19b 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -32,6 +32,7 @@
using string_map = std::map<std::string, std::string>;
class API_INITIALIZER {
+ public:
static int initialized;
};
@@ -353,7 +354,7 @@ processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor
return nullptr;
}
ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addCallback(nullptr, ontrigger_callback);
+ auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
processor *new_processor = new processor();
new_processor->processor_ptr = proc.get();
return new_processor;
@@ -399,6 +400,11 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
return nullptr;
}
+int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) {
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+}
+
int set_property(processor *proc, const char *name, const char *value) {
if (name != nullptr && value != nullptr && proc != nullptr) {
core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dc45446..6981bce 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -703,7 +703,7 @@ void ProcessSession::commit() {
// No connection
if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
// Not autoterminate, we should have the connect
- std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
+ std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
} else {
// Autoterminated
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index a1e22ad..7df4c76 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -1,4 +1,4 @@
set noparent
filter=-build/include_order,-build/include_alpha
exclude_files=Server.cpp
-exclude_files=TestBase.cpp
\ No newline at end of file
+exclude_files=TestBase.cpp
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
index 1739e1b..08214ac 100644
--- a/libminifi/test/capi/CAPITests.cpp
+++ b/libminifi/test/capi/CAPITests.cpp
@@ -38,6 +38,17 @@ static nifi_instance *create_instance_obj(const char *name = "random_instance")
return create_instance("random_instance", &port);
}
+static int failure_count = 0;
+
+void failure_counter(const flow_file_record * fr) {
+ failure_count++;
+ REQUIRE(get_attribute_qty(fr) > 0);
+}
+
+void big_failure_counter(const flow_file_record * fr) {
+ failure_count += 100;
+}
+
TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
auto instance = create_instance_obj();
REQUIRE(instance != nullptr);
@@ -116,6 +127,9 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
REQUIRE(test_file_content == put_data);
+ // No failure handler can be added after the flow is finalized
+ REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
+
free_flowfile(record);
free_flow(test_flow);
@@ -126,8 +140,6 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
TestController testController;
- enable_logging();
-
char src_format[] = "/tmp/gt.XXXXXX";
const char *sourcedir = testController.createTempDirectory(src_format);
@@ -201,3 +213,56 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
free_flow(test_flow);
free_instance(instance);
}
+
+TEST_CASE("Test error handling callback", "[errorHandling]") {
+ TestController testController;
+
+ char src_format[] = "/tmp/gt.XXXXXX";
+ const char *sourcedir = testController.createTempDirectory(src_format);
+ std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+
+ REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
+
+ processor *get_proc = add_processor(test_flow, "GetFile");
+ REQUIRE(get_proc != nullptr);
+ processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+ REQUIRE(put_proc != nullptr);
+ REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+ REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
+ REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+
+ ss << sourcedir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+
+
+ REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+
+ REQUIRE(failure_count == 1);
+
+ // Failure handler function can be replaced runtime
+ REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+
+ // Create new testfile to trigger failure again
+ ss << "2";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+
+ REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+ REQUIRE(failure_count > 100);
+
+ failure_count = 0;
+
+ free_flow(test_flow);
+ free_instance(instance);
+}