You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/09 16:06:27 UTC
nifi-minifi-cpp git commit: MINIFICPP-637 - extend C API to support
adding processors with configuration to existing flows
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 6d9aaa2b3 -> cb17947d7
MINIFICPP-637 - extend C API to support adding processors with configuration to existing flows
This closes #416.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/cb17947d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/cb17947d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/cb17947d
Branch: refs/heads/master
Commit: cb17947d74cc35ab888d17043eaa84695d3f5159
Parents: 6d9aaa2
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Mon Oct 8 16:04:31 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Oct 9 12:05:35 2018 -0400
----------------------------------------------------------------------
libminifi/include/capi/Plan.h | 2 ++
libminifi/include/capi/api.h | 6 +++++-
libminifi/src/capi/Plan.cpp | 26 ++++++++++++++++----------
libminifi/src/capi/api.cpp | 21 ++++++++++++++++++++-
4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index dbbaca0..e4235a8 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -79,6 +79,8 @@ class ExecutionPlan {
return content_repo_;
}
+ static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
protected:
void finalize();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 4896369..c7d281e 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -58,7 +58,7 @@ typedef struct {
nifi_instance *create_instance(char *url, nifi_port *port);
-void set_property(nifi_instance *, char *, char *);
+void set_instance_property(nifi_instance *, char *, char *);
void initialize_instance(nifi_instance *);
@@ -138,6 +138,10 @@ flow *create_flow(nifi_instance *, const char *);
flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+processor *add_processor(flow *parent_flow, const char *processor_name);
+
+int set_property(processor *proc, const char *name, const char *value);
+
void free_flow(flow *);
flow_file_record *get_next_flow_file(nifi_instance *, flow *);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 1502f18..43ef6e6 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -97,16 +97,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &
return nullptr;
}
- utils::Identifier uuid;
- id_generator_->generate(uuid);
-
- auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
- if (nullptr == ptr) {
- throw std::exception();
- }
- std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
- processor->setName(name);
+ auto processor = ExecutionPlan::createProcessor(processor_name, name);
return addProcessor(processor, name, relationship, linkToPrevious);
}
@@ -213,3 +204,18 @@ void ExecutionPlan::finalize() {
finalized = true;
}
+std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
+ utils::Identifier uuid;
+ id_generator_->generate(uuid);
+
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+ if (nullptr == ptr) {
+ throw std::exception();
+ }
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+ processor->setName(name);
+ return processor;
+}
+
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 1671c86..5429873 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -27,6 +27,7 @@
#include "ResourceClaim.h"
#include "processors/GetFile.h"
#include "core/logging/LoggerConfiguration.h"
+#include "utils/StringUtils.h"
using string_map = std::map<std::string, std::string>;
@@ -99,7 +100,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac
* @param key key in which we will set the valiue
* @param value
*/
-void set_property(nifi_instance *instance, char *key, char *value) {
+void set_instance_property(nifi_instance *instance, char *key, char *value) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->getConfiguration()->set(key, value);
}
@@ -281,6 +282,24 @@ flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *
return new_flow;
}
+processor *add_processor(flow *flow, const char *processor_name) {
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ auto proc = plan->addProcessor(processor_name, processor_name);
+ if (proc) {
+ processor *new_processor = new processor();
+ new_processor->processor_ptr = proc.get();
+ return new_processor;
+ }
+ return nullptr;
+}
+int set_property(processor *proc, const char *name, const char *value) {
+ if (name != nullptr && value != nullptr && proc != nullptr) {
+ core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
+ return p->setProperty(name, value) ? 0 : -2;
+ }
+ return -1;
+}
+
void free_flow(flow *flow) {
if (flow == nullptr)
return;