You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/10/09 16:06:27 UTC

nifi-minifi-cpp git commit: MINIFICPP-637 - extend C API to support adding processors with configuration to existing flows

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 6d9aaa2b3 -> cb17947d7


MINIFICPP-637 - extend C API to support adding processors with configuration to existing flows

This closes #416.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/cb17947d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/cb17947d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/cb17947d

Branch: refs/heads/master
Commit: cb17947d74cc35ab888d17043eaa84695d3f5159
Parents: 6d9aaa2
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Mon Oct 8 16:04:31 2018 +0200
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Oct 9 12:05:35 2018 -0400

----------------------------------------------------------------------
 libminifi/include/capi/Plan.h |  2 ++
 libminifi/include/capi/api.h  |  6 +++++-
 libminifi/src/capi/Plan.cpp   | 26 ++++++++++++++++----------
 libminifi/src/capi/api.cpp    | 21 ++++++++++++++++++++-
 4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
index dbbaca0..e4235a8 100644
--- a/libminifi/include/capi/Plan.h
+++ b/libminifi/include/capi/Plan.h
@@ -79,6 +79,8 @@ class ExecutionPlan {
     return content_repo_;
   }
 
+  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
  protected:
 
   void finalize();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 4896369..c7d281e 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -58,7 +58,7 @@ typedef struct {
 
 nifi_instance *create_instance(char *url, nifi_port *port);
 
-void set_property(nifi_instance *, char *, char *);
+void set_instance_property(nifi_instance *, char *, char *);
 
 void initialize_instance(nifi_instance *);
 
@@ -138,6 +138,10 @@ flow *create_flow(nifi_instance *, const char *);
 
 flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
 
+processor *add_processor(flow *parent_flow, const char *processor_name);
+
+int set_property(processor *proc, const char *name, const char *value);
+
 void free_flow(flow *);
 
 flow_file_record *get_next_flow_file(nifi_instance *, flow *);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 1502f18..43ef6e6 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -97,16 +97,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &
     return nullptr;
   }
 
-  utils::Identifier uuid;
-  id_generator_->generate(uuid);
-
-  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
-  if (nullptr == ptr) {
-    throw std::exception();
-  }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
-  processor->setName(name);
+  auto processor = ExecutionPlan::createProcessor(processor_name, name);
 
   return addProcessor(processor, name, relationship, linkToPrevious);
 }
@@ -213,3 +204,18 @@ void ExecutionPlan::finalize() {
   finalized = true;
 }
 
+std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
+
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    throw std::exception();
+  }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+  return processor;
+}
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cb17947d/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 1671c86..5429873 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -27,6 +27,7 @@
 #include "ResourceClaim.h"
 #include "processors/GetFile.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/StringUtils.h"
 
 using string_map = std::map<std::string, std::string>;
 
@@ -99,7 +100,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac
  * @param key key in which we will set the valiue
  * @param value
  */
-void set_property(nifi_instance *instance, char *key, char *value) {
+void set_instance_property(nifi_instance *instance, char *key, char *value) {
   auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
   minifi_instance_ref->getConfiguration()->set(key, value);
 }
@@ -281,6 +282,24 @@ flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *
   return new_flow;
 }
 
+processor *add_processor(flow *flow, const char *processor_name) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name);
+  if (proc) {
+    processor *new_processor = new processor();
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+int set_property(processor *proc, const char *name, const char *value) {
+  if (name != nullptr && value != nullptr && proc != nullptr) {
+    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
+    return p->setProperty(name, value) ? 0 : -2;
+  }
+  return -1;
+}
+
 void free_flow(flow *flow) {
   if (flow == nullptr)
     return;