You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/31 18:28:37 UTC

[1/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into nanofi

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master fc1074a04 -> 556794b15


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/api/nanofi.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
new file mode 100644
index 0000000..ee33c6b
--- /dev/null
+++ b/nanofi/src/api/nanofi.cpp
@@ -0,0 +1,518 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <map>
+#include <memory>
+#include <utility>
+#include <exception>
+
+#include "api/nanofi.h"
+#include "core/Core.h"
+#include "core/expect.h"
+#include "cxx/Instance.h"
+#include "cxx/Plan.h"
+#include "ResourceClaim.h"
+#include "processors/GetFile.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/StringUtils.h"
+
+using string_map = std::map<std::string, std::string>;
+
+class API_INITIALIZER {
+ public:
+  static int initialized;
+};
+
+int API_INITIALIZER::initialized = initialize_api();
+
+int initialize_api() {
+  logging::LoggerConfiguration::getConfiguration().disableLogging();
+  return 1;
+}
+
+void enable_logging() {
+  logging::LoggerConfiguration::getConfiguration().enableLogging();
+}
+
+void set_terminate_callback(void (*terminate_callback)()) {
+  std::set_terminate(terminate_callback);
+}
+
+class DirectoryConfiguration {
+ protected:
+  DirectoryConfiguration() {
+    minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+  }
+ public:
+  static void initialize() {
+    static DirectoryConfiguration configure;
+  }
+};
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ * @Deprecated for API version 0.2 in favor of the following prototype
+ * nifi_instance *create_instance(nifi_port const *port) {
+ */
+nifi_instance *create_instance(const char *url, nifi_port *port) {
+  // make sure that we have a thread safe way of initializing the content directory
+  DirectoryConfiguration::initialize();
+
+  // need reinterpret cast until we move to C for this module.
+  nifi_instance *instance = reinterpret_cast<nifi_instance*>(malloc(sizeof(nifi_instance)));
+  /**
+   * This API will gradually move away from C++, hence malloc is used for nifi_instance
+   * Since minifi::Instance is currently being used, then we need to use new in that case.
+   */
+  instance->instance_ptr = new minifi::Instance(url, port->port_id);
+  // may have to translate port ID here in the future
+  // need reinterpret cast until we move to C for this module.
+  instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
+  snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
+  return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->setRemotePort(instance->port.port_id);
+}
+/*
+ typedef int c2_update_callback(char *);
+
+ typedef int c2_stop_callback(char *);
+
+ typedef int c2_start_callback(char *);
+
+ */
+void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
+}
+
+/**
+ * Sets a property within the nifi instance
+ * @param instance nifi instance
+ * @param key key in which we will set the valiue
+ * @param value
+ * @return -1 when instance or key are null
+ */
+int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
+  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
+    return -1;
+  }
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->getConfiguration()->set(key, value);
+  return 0;
+}
+
+/**
+ * Reclaims memory associated with a nifi instance structure.
+ * @param instance nifi instance.
+ */
+void free_instance(nifi_instance* instance) {
+  if (instance != nullptr) {
+    delete ((minifi::Instance*) instance->instance_ptr);
+    free(instance->port.port_id);
+    free(instance);
+  }
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len) {
+  flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record));
+  new_ff->attributes = new string_map();
+  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+  // set the size of the flow file.
+  new_ff->size = in.tellg();
+  return new_ff;
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
+  if (nullptr == file) {
+    return nullptr;
+  }
+  flow_file_record *new_ff = create_ff_object_na(file, len, size);
+  new_ff->attributes = new string_map();
+  new_ff->ffp = 0;
+  return new_ff;
+}
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
+  flow_file_record *new_ff = new flow_file_record;
+  new_ff->attributes = nullptr;
+  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  // set the size of the flow file.
+  new_ff->size = size;
+  new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
+  return new_ff;
+}
+/**
+ * Reclaims memory associated with a flow file object
+ * @param ff flow file record.
+ */
+void free_flowfile(flow_file_record *ff) {
+  if (ff == nullptr) {
+    return;
+  }
+  auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+  if (content_repo_ptr->get()) {
+    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+    (*content_repo_ptr)->remove(claim);
+  }
+  if (ff->ffp == nullptr) {
+    auto map = static_cast<string_map*>(ff->attributes);
+    delete map;
+  }
+  free(ff->contentLocation);
+  free(ff);
+  delete content_repo_ptr;
+}
+
+/**
+ * Adds an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
+ */
+uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
+  return ret.second ? 0 : -1;
+}
+
+/**
+ * Updates (or adds) an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ */
+void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
+}
+
+/*
+ * Obtains the attribute.
+ * @param ff flow file record
+ * @param key key
+ * @param caller_attribute caller supplied object in which we will copy the data ptr
+ * @return 0 if successful, -1 if the key does not exist
+ */
+uint8_t get_attribute(flow_file_record * ff, attribute * caller_attribute) {
+  if (ff == nullptr) {
+    return -1;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  if (!attribute_map) {
+    return -1;
+  }
+  auto find = attribute_map->find(caller_attribute->key);
+  if (find != attribute_map->end()) {
+    caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
+    caller_attribute->value_size = find->second.size();
+    return 0;
+  }
+  return -1;
+}
+
+int get_attribute_qty(const flow_file_record* ff) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  return attribute_map ? attribute_map->size() : 0;
+}
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  if (!attribute_map || attribute_map->empty()) {
+    return 0;
+  }
+  int i = 0;
+  for (const auto& kv : *attribute_map) {
+    if (i >= target->size) {
+      break;
+    }
+    target->attributes[i].key = kv.first.data();
+    target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
+    target->attributes[i].value_size = kv.second.size();
+    ++i;
+  }
+  return i;
+}
+
+/**
+ * Removes a key from the attribute chain
+ * @param ff flow file record
+ * @param key key to remove
+ * @return 0 if removed, -1 otherwise
+ */
+uint8_t remove_attribute(flow_file_record *ff, const char *key) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  return attribute_map->erase(key) - 1;  // erase by key returns the number of elements removed (0 or 1)
+}
+
+/**
+ * Transmits the flowfile
+ * @param ff flow file record
+ * @param instance nifi instance structure
+ */
+int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
+  if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
+    minifi_instance_ref->setRemotePort(instance->port.port_id);
+  }
+
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+
+  auto no_op = minifi_instance_ref->getNoOpRepository();
+
+  auto content_repo = minifi_instance_ref->getContentRepository();
+
+  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+  claim->increaseFlowFileRecordOwnedCount();
+  claim->increaseFlowFileRecordOwnedCount();
+
+  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
+  ffr->addAttribute("nanofi.version", API_VERSION);
+  ffr->setSize(ff->size);
+
+  std::string port_uuid = instance->port.port_id;
+
+  minifi_instance_ref->transfer(ffr);
+
+  return 0;
+}
+
+flow * create_new_flow(nifi_instance * instance) {
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = (flow*) malloc(sizeof(flow));
+
+  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+  new_flow->plan = execution_plan;
+
+  return new_flow;
+}
+
+flow *create_flow(nifi_instance *instance, const char *first_processor) {
+  if (nullptr == instance || nullptr == instance->instance_ptr) {
+    return nullptr;
+  }
+  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = (flow*) malloc(sizeof(flow));
+
+  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+  new_flow->plan = execution_plan;
+
+  if (first_processor != nullptr && strlen(first_processor) > 0) {
+    // automatically adds it with success
+    execution_plan->addProcessor(first_processor, first_processor);
+  }
+  return new_flow;
+}
+
+processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
+  if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
+    return nullptr;
+  }
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
+  processor *new_processor = (processor*) malloc(sizeof(processor));
+  new_processor->processor_ptr = proc.get();
+  return new_processor;
+}
+
+flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) {
+  static const std::string first_processor = "GetFile";
+  flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
+
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
+  // automatically adds it with success
+  auto getFile = plan->addProcessor(first_processor, first_processor);
+
+  plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
+  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
+  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
+
+  return new_flow;
+}
+
+processor *add_processor(flow *flow, const char *processor_name) {
+  if (nullptr == flow || nullptr == processor_name) {
+    return nullptr;
+  }
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name);
+  if (proc) {
+    processor *new_processor = (processor*) malloc(sizeof(processor));
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
+  if (proc) {
+    processor *new_processor = (processor*) malloc(sizeof(processor));
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+}
+
+int set_failure_strategy(flow *flow, FailureStrategy strategy) {
+  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
+}
+
+int set_property(processor *proc, const char *name, const char *value) {
+  if (name != nullptr && value != nullptr && proc != nullptr) {
+    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
+    bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
+    return success ? 0 : -2;
+  }
+  return -1;
+}
+
+int free_flow(flow *flow) {
+  if (flow == nullptr || nullptr == flow->plan)
+    return -1;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  delete execution_plan;
+  free(flow);
+  return 0;
+}
+
+flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
+  if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
+    return nullptr;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  execution_plan->reset();
+  while (execution_plan->runNextProcessor()) {
+  }
+  auto ff = execution_plan->getCurrentFlowFile();
+  if (ff == nullptr) {
+    return nullptr;
+  }
+  auto claim = ff->getResourceClaim();
+
+  if (claim != nullptr) {
+    // create a flow file.
+    claim->increaseFlowFileRecordOwnedCount();
+    auto path = claim->getContentFullPath();
+    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+    ffr->ffp = ff.get();
+    ffr->attributes = ff->getAttributesPtr();
+    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
+    return ffr;
+  } else {
+    return nullptr;
+  }
+}
+
+size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
+  if (nullptr == instance || nullptr == flow || nullptr == ff_r)
+    return 0;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  size_t i = 0;
+  for (; i < size; i++) {
+    execution_plan->reset();
+    auto ffr = get_next_flow_file(instance, flow);
+    if (ffr == nullptr) {
+      break;
+    }
+    ff_r[i] = ffr;
+  }
+  return i;
+}
+
+flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) {
+  if (nullptr == instance || nullptr == flow || nullptr == session)
+    return nullptr;
+  auto sesh = static_cast<core::ProcessSession*>(session->session);
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto ff = sesh->get();
+  execution_plan->setNextFlowFile(ff);
+  if (ff == nullptr) {
+    return nullptr;
+  }
+  auto claim = ff->getResourceClaim();
+
+  if (claim != nullptr) {
+    // create a flow file.
+    claim->increaseFlowFileRecordOwnedCount();
+    auto path = claim->getContentFullPath();
+    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+    ffr->attributes = ff->getAttributesPtr();
+    ffr->ffp = ff.get();
+    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
+    return ffr;
+  } else {
+    return nullptr;
+  }
+}
+
+int transfer(processor_session* session, flow *flow, const char *rel) {
+  if (nullptr == session || nullptr == flow || rel == nullptr) {
+    return -1;
+  }
+  auto sesh = static_cast<core::ProcessSession*>(session->session);
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  if (nullptr == sesh || nullptr == execution_plan) {
+    return -1;
+  }
+  core::Relationship relationship(rel, rel);
+  auto ff = execution_plan->getNextFlowFile();
+  if (nullptr == ff) {
+    return -2;
+  }
+  sesh->transfer(ff, relationship);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp
new file mode 100644
index 0000000..3a2b0d1
--- /dev/null
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/C2CallbackAgent.h"
+#include <csignal>
+#include <utility>
+#include <vector>
+#include <map>
+#include <string>
+#include <memory>
+#include "c2/ControllerSocketProtocol.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/FileManager.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                                 const std::shared_ptr<Configure> &configuration)
+    : C2Agent(controller, updateSink, configuration),
+      stop(nullptr),
+      logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
+}
+
+void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) {
+  switch (resp.op) {
+    case Operation::CLEAR:
+      break;
+    case Operation::UPDATE:
+      break;
+    case Operation::DESCRIBE:
+      break;
+    case Operation::RESTART:
+      break;
+    case Operation::START:
+    case Operation::STOP: {
+      if (resp.name == "C2" || resp.name == "c2") {
+        raise(SIGTERM);
+      }
+
+      auto str = resp.name.c_str();
+
+      if (nullptr != stop)
+        stop(const_cast<char*>(str));
+
+      break;
+    }
+      //
+      break;
+    default:
+      break;
+      // do nothing
+  }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/CallbackProcessor.cpp b/nanofi/src/cxx/CallbackProcessor.cpp
new file mode 100644
index 0000000..5294a1b
--- /dev/null
+++ b/nanofi/src/cxx/CallbackProcessor.cpp
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "cxx/CallbackProcessor.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  if (callback_ != nullptr) {
+    processor_session sesh;
+    sesh.session = session;
+    callback_(&sesh);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/Plan.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
new file mode 100644
index 0000000..f892aa9
--- /dev/null
+++ b/nanofi/src/cxx/Plan.cpp
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/Plan.h"
+#include "cxx/CallbackProcessor.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+bool intToFailureStragey(int in, FailureStrategy *out) {
+  auto tmp = static_cast<FailureStrategy>(in);
+  switch (tmp) {
+    case AS_IS:
+    case ROLLBACK:
+      *out = tmp;
+      return true;
+    default:
+      return false;
+  }
+}
+
+std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+    : content_repo_(content_repo),
+      flow_repo_(flow_repo),
+      prov_repo_(prov_repo),
+      finalized(false),
+      location(-1),
+      current_flowfile_(nullptr),
+      logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
+  stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+}
+
+/**
+ * Add a callback to obtain and pass processor session to a generated processor
+ *
+ */
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
+  if (finalized) {
+    return nullptr;
+  }
+
+  auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
+  if (!ptr)
+    return nullptr;
+
+  std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
+  processor->setCallback(obj, fp);
+
+  return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
+}
+
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+  uint32_t i = 0;
+  logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
+  for (i = 0; i < processor_queue_.size(); i++) {
+    if (processor_queue_.at(i) == proc) {
+      break;
+    }
+  }
+
+  if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
+    return false;
+  }
+
+  return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
+
+  processor->setStreamFactory(stream_factory);
+  // initialize the processor
+  processor->initialize();
+
+  processor_mapping_[processor->getUUIDStr()] = processor;
+
+  if (!linkToPrevious) {
+    termination_ = relationship;
+  } else {
+    std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+    if (last == nullptr) {
+      last = processor;
+      termination_ = relationship;
+    }
+
+    relationships_.push_back(connectProcessors(last, processor, relationship, true));
+  }
+
+  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+  processor_nodes_.push_back(node);
+
+  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+  processor_contexts_.push_back(context);
+
+  processor_queue_.push_back(processor);
+
+  return processor;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  auto processor = ExecutionPlan::createProcessor(processor_name, name);
+  if (!processor) {
+    return nullptr;
+  }
+  return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+void ExecutionPlan::reset() {
+  process_sessions_.clear();
+  factories_.clear();
+  location = -1;
+  for (auto proc : processor_queue_) {
+    while (proc->getActiveTasks() > 0) {
+      proc->decrementActiveTask();
+    }
+  }
+}
+
+bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  location++;
+  if (location >= processor_queue_.size()) {
+    return false;
+  }
+  std::shared_ptr<core::Processor> processor = processor_queue_[location];
+  std::shared_ptr<core::ProcessContext> context = processor_contexts_[location];
+  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+  factories_.push_back(factory);
+  if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+    processor->onSchedule(context, factory);
+    configured_processors_.push_back(processor);
+  }
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+  process_sessions_.push_back(current_session);
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context, current_session);
+  } else {
+    logger_->log_debug("Running %s", processor->getName());
+    processor->onTrigger(context, current_session);
+  }
+  current_session->commit();
+  current_flowfile_ = current_session->get();
+  auto hasMore = location + 1 < processor_queue_.size();
+  if (!hasMore && !current_flowfile_) {
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    current_flowfile_ = relationships_.back()->poll(expired);
+  }
+  return hasMore;
+}
+
+std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
+  return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
+  return current_flowfile_;
+}
+
+std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
+  return current_session_;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
+  return connectProcessors(processor, processor, termination_, set_dst);
+}
+
+void ExecutionPlan::finalize() {
+  if (failure_handler_) {
+    auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
+
+    std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
+    callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
+
+    for (const auto& proc : processor_queue_) {
+      for (const auto& rel : proc->getSupportedRelationships()) {
+        if (rel.getName() == "failure") {
+          relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+          break;
+        }
+      }
+    }
+
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
+
+    processor_nodes_.push_back(node);
+
+    std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+    processor_contexts_.push_back(context);
+
+    processor_queue_.push_back(failure_proc);
+  }
+
+  if (relationships_.size() > 0) {
+    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+  } else {
+    for (auto processor : processor_queue_) {
+      relationships_.push_back(buildFinalConnection(processor, true));
+    }
+  }
+
+  finalized = true;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
+
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    return nullptr;
+  }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+  return processor;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, core::Relationship relationship,
+                                                                     bool set_dst) {
+  std::stringstream connection_name;
+  connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+  connection->setRelationship(relationship);
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(src_proc);
+
+  utils::Identifier uuid_copy, uuid_copy_next;
+  src_proc->getUUID(uuid_copy);
+  connection->setSourceUUID(uuid_copy);
+  if (set_dst) {
+    connection->setDestination(dst_proc);
+    dst_proc->getUUID(uuid_copy_next);
+    connection->setDestinationUUID(uuid_copy_next);
+    if (src_proc != dst_proc) {
+      dst_proc->addConnection(connection);
+    }
+  }
+  src_proc->addConnection(connection);
+
+  return connection;
+}
+
+bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
+  if (finalized && !failure_handler_) {
+    return false;  // Already finalized the flow without failure handler processor
+  }
+  if (!failure_handler_) {
+    failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
+  }
+  failure_handler_->setCallback(onerror_callback);
+  return true;
+}
+
+bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
+  if (!failure_handler_) {
+    return false;
+  }
+  failure_handler_->setStrategy(start);
+  return true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/tests/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
new file mode 100644
index 0000000..65c52e1
--- /dev/null
+++ b/nanofi/tests/CAPITests.cpp
@@ -0,0 +1,278 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <sys/stat.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "utils/file/FileUtils.h"
+#include "TestBase.h"
+
+#include <chrono>
+#include <thread>
+#include "api/nanofi.h"
+
+static nifi_instance *create_instance_obj(const char *name = "random_instance") {
+  nifi_port port;
+  char port_str[] = "12345";
+  port.port_id = port_str;
+  return create_instance("random_instance", &port);
+}
+
+static int failure_count = 0;
+
+void failure_counter(flow_file_record * fr) {
+  failure_count++;
+  REQUIRE(get_attribute_qty(fr) > 0);
+  free_flowfile(fr);
+}
+
+void big_failure_counter(flow_file_record * fr) {
+  failure_count += 100;
+  free_flowfile(fr);
+}
+
+TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  processor *test_proc = add_processor(test_flow, "NeverExisted");
+  REQUIRE(test_proc == nullptr);
+  processor *no_proc = add_processor(test_flow, "");
+  REQUIRE(no_proc == nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  REQUIRE(set_property(test_proc, "Data Format", "Text") == 0);  // Valid value
+  // TODO(aboda): add this two below when property handling is made strictly typed
+  // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
+  // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
+  REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0);  // Empty value
+  REQUIRE(set_property(test_proc, nullptr, "Blah") != 0);  // Empty attribute
+  REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0);  // Invalid processor
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("get file and put file", "[getAndPutFile]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  char put_format[] = "/tmp/pt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  const char *putfiledir = testController.createTempDirectory(put_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+  REQUIRE(record != nullptr);
+
+  ss.str("");
+
+  ss << putfiledir << "/" << "tstFile.ext";
+  std::ifstream t(ss.str());
+  std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
+
+  REQUIRE(test_file_content == put_data);
+
+  // No failure handler can be added after the flow is finalized
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+
+  free_instance(instance);
+}
+
+TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
+  REQUIRE(extract_test != nullptr);
+  REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
+  processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
+  REQUIRE(update_attr != nullptr);
+
+  REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+
+  REQUIRE(record != nullptr);
+
+  attribute test_attr;
+  test_attr.key = "TestAttr";
+  REQUIRE(get_attribute(record, &test_attr) == 0);
+
+  REQUIRE(test_attr.value_size != 0);
+  REQUIRE(test_attr.value != nullptr);
+
+  std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
+
+  REQUIRE(attr_value == test_file_content);
+
+  const char * new_testattr_value = "S0me t3st t3xt";
+
+  // Attribute already exist, should fail
+  REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0);  // NOLINT
+
+  // Update overwrites values
+  update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value));  // NOLINT
+
+  int attr_size = get_attribute_qty(record);
+  REQUIRE(attr_size > 0);
+
+  attribute_set attr_set;
+  attr_set.size = attr_size;
+  attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute));  // NOLINT
+
+  REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
+
+  bool test_attr_found = false;
+  bool updated_attr_found = false;
+  for (int i = 0; i < attr_set.size; ++i) {
+    if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
+      test_attr_found = true;
+      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
+    } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
+      updated_attr_found = true;
+      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
+    }
+  }
+  REQUIRE(updated_attr_found == true);
+  REQUIRE(test_attr_found == true);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Test error handling callback", "[errorHandling]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  // Failure strategy cannot be set before a valid callback is added
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
+  REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+
+  REQUIRE(failure_count == 1);
+
+  // Failure handler function can be replaced runtime
+  REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
+
+  // Create new testfile to trigger failure again
+  ss << "2";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+  REQUIRE(failure_count > 100);
+
+  failure_count = 0;
+
+  free_flow(test_flow);
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/library/CMakeLists.txt b/python/library/CMakeLists.txt
index 5d309a1..684cf22 100644
--- a/python/library/CMakeLists.txt
+++ b/python/library/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../../blocks/ ../../libminifi/include  ../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
+include_directories(../../nanofi/include/ ../../libminifi/include  ../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
 if(WIN32)
 	include_directories(../../libminifi/opsys/win)
 else()
@@ -33,9 +33,9 @@ include_directories(../../extensions/http-curl/ ../../extensions/http-curl/clien
 
 add_library(python-lib SHARED python_lib.cpp)
 if (APPLE)
-	target_link_libraries(python-lib capi core-minifi minifi)
+	target_link_libraries(python-lib nanofi core-minifi minifi)
 else()
-	target_link_libraries(python-lib capi-shared core-minifi-shared minifi-shared)
+	target_link_libraries(python-lib nanofi-shared core-minifi-shared minifi-shared)
 endif(APPLE)
 
 if (WIN32)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/python_lib.cpp
----------------------------------------------------------------------
diff --git a/python/library/python_lib.cpp b/python/library/python_lib.cpp
index eb81f79..79241df 100644
--- a/python/library/python_lib.cpp
+++ b/python/library/python_lib.cpp
@@ -22,11 +22,10 @@
 #include <sys/stat.h>
 #include <unistd.h>
 #include <dirent.h>
-#include "capi/api.h"
-#include "file_blocks.h"
-#include "comms.h"
-#include "capi/api.h"
-#include "capi/processors.h"
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
 #include "HTTPCurlLoader.h"
 #include "python_lib.h"
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/rocksdb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/CMakeLists.txt b/thirdparty/rocksdb/CMakeLists.txt
index 837f0bc..f20d09a 100644
--- a/thirdparty/rocksdb/CMakeLists.txt
+++ b/thirdparty/rocksdb/CMakeLists.txt
@@ -584,6 +584,7 @@ else()
 endif()
 
 set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX})
+# commented out to avoid building the shared lib
 #set(ROCKSDB_SHARED_LIB rocksdb-shared${ARTIFACT_SUFFIX})
 set(ROCKSDB_IMPORT_LIB ${ROCKSDB_SHARED_LIB})
 if(WIN32)
@@ -592,16 +593,19 @@ if(WIN32)
   set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
 else()
   set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT})
-  set(LIBS ${ROCKSDB_SHARED_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
-  add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
- # target_link_libraries(${ROCKSDB_SHARED_LIB}
+  set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
+# commented out to avoid building the shared lib
+# as there is no reason
+#add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
+
+# target_link_libraries(${ROCKSDB_SHARED_LIB}
 #    ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
-  set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
-                        LINKER_LANGUAGE CXX
-                        VERSION ${ROCKSDB_VERSION}
-                        SOVERSION ${ROCKSDB_VERSION_MAJOR}
-                        CXX_STANDARD 11
-                        OUTPUT_NAME "rocksdb")
+#  set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
+#                        LINKER_LANGUAGE CXX
+#                        VERSION ${ROCKSDB_VERSION}
+#                        SOVERSION ${ROCKSDB_VERSION_MAJOR}
+#                        CXX_STANDARD 11
+#                        OUTPUT_NAME "rocksdb")
 endif()
 
 option(WITH_LIBRADOS "Build with librados" OFF)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
index f4239a4..6716638 100644
--- a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
+++ b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
@@ -44,7 +44,7 @@ option(YAML_CPP_BUILD_CONTRIB "Enable contrib stuff in library" ON)
 # --> General
 # see http://www.cmake.org/cmake/help/cmake2.6docs.html#variable:BUILD_SHARED_LIBS
 #     http://www.cmake.org/cmake/help/cmake2.6docs.html#command:add_library
-option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
+#option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
 
 
 


[2/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into nanofi

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
deleted file mode 100644
index e135fe1..0000000
--- a/libminifi/src/capi/api.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <string>
-#include <map>
-#include <memory>
-#include <utility>
-#include <exception>
-#include "core/Core.h"
-#include "capi/api.h"
-#include "capi/expect.h"
-#include "capi/Instance.h"
-#include "capi/Plan.h"
-#include "ResourceClaim.h"
-#include "processors/GetFile.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/StringUtils.h"
-
-using string_map = std::map<std::string, std::string>;
-
-class API_INITIALIZER {
- public:
-  static int initialized;
-};
-
-int API_INITIALIZER::initialized = initialize_api();
-
-int initialize_api() {
-  logging::LoggerConfiguration::getConfiguration().disableLogging();
-  return 1;
-}
-
-void enable_logging() {
-  logging::LoggerConfiguration::getConfiguration().enableLogging();
-}
-
-void set_terminate_callback(void (*terminate_callback)()) {
-  std::set_terminate(terminate_callback);
-}
-
-class DirectoryConfiguration {
- protected:
-  DirectoryConfiguration() {
-    minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
-  }
- public:
-  static void initialize() {
-    static DirectoryConfiguration configure;
-  }
-};
-
-/**
- * Creates a NiFi Instance from the url and output port.
- * @param url http URL for NiFi instance
- * @param port Remote output port.
- * @Deprecated for API version 0.2 in favor of the following prototype
- * nifi_instance *create_instance(nifi_port const *port) {
- */
-nifi_instance *create_instance(const char *url, nifi_port *port) {
-  // make sure that we have a thread safe way of initializing the content directory
-  DirectoryConfiguration::initialize();
-
-  // need reinterpret cast until we move to C for this module.
-  nifi_instance *instance = reinterpret_cast<nifi_instance*>( malloc(sizeof(nifi_instance)) );
-  /**
-   * This API will gradually move away from C++, hence malloc is used for nifi_instance
-   * Since minifi::Instance is currently being used, then we need to use new in that case.
-   */
-  instance->instance_ptr = new minifi::Instance(url, port->port_id);
-  // may have to translate port ID here in the future
-  // need reinterpret cast until we move to C for this module.
-  instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
-  snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
-  return instance;
-}
-
-/**
- * Initializes the instance
- */
-void initialize_instance(nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->setRemotePort(instance->port.port_id);
-}
-/*
- typedef int c2_update_callback(char *);
-
- typedef int c2_stop_callback(char *);
-
- typedef int c2_start_callback(char *);
-
- */
-void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
-}
-
-/**
- * Sets a property within the nifi instance
- * @param instance nifi instance
- * @param key key in which we will set the valiue
- * @param value
- * @return -1 when instance or key are null
- */
-int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
-  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
-    return -1;
-  }
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->getConfiguration()->set(key, value);
-  return 0;
-}
-
-/**
- * Reclaims memory associated with a nifi instance structure.
- * @param instance nifi instance.
- */
-void free_instance(nifi_instance* instance) {
-  if (instance != nullptr) {
-    delete ((minifi::Instance*) instance->instance_ptr);
-    free(instance->port.port_id);
-    free(instance);
-  }
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_flowfile(const char *file, const size_t len) {
-  flow_file_record *new_ff = new flow_file_record;
-  new_ff->attributes = new string_map();
-  new_ff->contentLocation = new char[len + 1];
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
-  // set the size of the flow file.
-  new_ff->size = in.tellg();
-  return new_ff;
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
-  if (nullptr == file) {
-    return nullptr;
-  }
-  flow_file_record *new_ff = create_ff_object_na(file, len, size);
-  new_ff->attributes = new string_map();
-  new_ff->ffp = 0;
-  return new_ff;
-}
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
-  flow_file_record *new_ff = new flow_file_record;
-  new_ff->attributes = nullptr;
-  new_ff->contentLocation = new char[len + 1];
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  // set the size of the flow file.
-  new_ff->size = size;
-  new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
-  return new_ff;
-}
-/**
- * Reclaims memory associated with a flow file object
- * @param ff flow file record.
- */
-void free_flowfile(flow_file_record *ff) {
-  if (ff == nullptr) {
-    return;
-  }
-  auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
-  if (content_repo_ptr->get()) {
-    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
-    (*content_repo_ptr)->remove(claim);
-  }
-  if (ff->ffp == nullptr) {
-    auto map = static_cast<string_map*>(ff->attributes);
-    delete map;
-  }
-  delete[] ff->contentLocation;
-  delete ff;
-  delete content_repo_ptr;
-}
-
-/**
- * Adds an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
- */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
-  return ret.second ? 0 : -1;
-}
-
-/**
- * Updates (or adds) an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- */
-void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
-}
-
-/*
- * Obtains the attribute.
- * @param ff flow file record
- * @param key key
- * @param caller_attribute caller supplied object in which we will copy the data ptr
- * @return 0 if successful, -1 if the key does not exist
- */
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) {
-  if (ff == nullptr) {
-    return -1;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map) {
-    return -1;
-  }
-  auto find = attribute_map->find(caller_attribute->key);
-  if (find != attribute_map->end()) {
-    caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
-    caller_attribute->value_size = find->second.size();
-    return 0;
-  }
-  return -1;
-}
-
-int get_attribute_qty(const flow_file_record* ff) {
-  if (ff == nullptr) {
-    return 0;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  return attribute_map ? attribute_map->size() : 0;
-}
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
-  if (ff == nullptr) {
-    return 0;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map || attribute_map->empty()) {
-    return 0;
-  }
-  int i = 0;
-  for (const auto& kv : *attribute_map) {
-    if (i >= target->size) {
-      break;
-    }
-    target->attributes[i].key = kv.first.data();
-    target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
-    target->attributes[i].value_size = kv.second.size();
-    ++i;
-  }
-  return i;
-}
-
-/**
- * Removes a key from the attribute chain
- * @param ff flow file record
- * @param key key to remove
- * @return 0 if removed, -1 otherwise
- */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  return attribute_map->erase(key) - 1;  // erase by key returns the number of elements removed (0 or 1)
-}
-
-/**
- * Transmits the flowfile
- * @param ff flow file record
- * @param instance nifi instance structure
- */
-int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
-  if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
-    minifi_instance_ref->setRemotePort(instance->port.port_id);
-  }
-
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-
-  auto no_op = minifi_instance_ref->getNoOpRepository();
-
-  auto content_repo = minifi_instance_ref->getContentRepository();
-
-  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
-  claim->increaseFlowFileRecordOwnedCount();
-  claim->increaseFlowFileRecordOwnedCount();
-
-  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
-  ffr->addAttribute("nanofi.version", API_VERSION);
-  ffr->setSize(ff->size);
-
-  std::string port_uuid = instance->port.port_id;
-
-  minifi_instance_ref->transfer(ffr);
-
-  return 0;
-}
-
-flow *create_new_flow(nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = new flow;
-
-  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
-
-  return new_flow;
-}
-
-flow *create_flow(nifi_instance *instance, const char *first_processor) {
-  if (nullptr == instance || nullptr == instance->instance_ptr) {
-    return nullptr;
-  }
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = new flow;
-
-  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
-
-  if (first_processor != nullptr && strlen(first_processor) > 0) {
-    // automatically adds it with success
-    execution_plan->addProcessor(first_processor, first_processor);
-  }
-  return new_flow;
-}
-
-processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
-  if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
-    return nullptr;
-  }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
-  processor *new_processor = new processor();
-  new_processor->processor_ptr = proc.get();
-  return new_processor;
-}
-
-flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) {
-  static const std::string first_processor = "GetFile";
-  flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
-
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
-  // automatically adds it with success
-  auto getFile = plan->addProcessor(first_processor, first_processor);
-
-  plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
-  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
-  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
-
-  return new_flow;
-}
-
-processor *add_processor(flow *flow, const char *processor_name) {
-  if (nullptr == flow || nullptr == processor_name) {
-    return nullptr;
-  }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name);
-  if (proc) {
-    processor *new_processor = new processor();
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
-}
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
-  if (proc) {
-    processor *new_processor = new processor();
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
-}
-
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
-}
-
-int set_failure_strategy(flow *flow, FailureStrategy strategy) {
-  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
-}
-
-int set_property(processor *proc, const char *name, const char *value) {
-  if (name != nullptr && value != nullptr && proc != nullptr) {
-    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
-    bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
-    return success ? 0 : -2;
-  }
-  return -1;
-}
-
-int free_flow(flow *flow) {
-  if (flow == nullptr || nullptr == flow->plan)
-    return -1;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  delete execution_plan;
-  delete flow;
-  return 0;
-}
-
-flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
-  if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
-    return nullptr;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  execution_plan->reset();
-  while (execution_plan->runNextProcessor()) {
-  }
-  auto ff = execution_plan->getCurrentFlowFile();
-  if (ff == nullptr) {
-    return nullptr;
-  }
-  auto claim = ff->getResourceClaim();
-
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->ffp = ff.get();
-    ffr->attributes = ff->getAttributesPtr();
-    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
-    return nullptr;
-  }
-}
-
-size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
-  if (nullptr == instance || nullptr == flow || nullptr == ff_r)
-    return 0;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  size_t i = 0;
-  for (; i < size; i++) {
-    execution_plan->reset();
-    auto ffr = get_next_flow_file(instance, flow);
-    if (ffr == nullptr) {
-      break;
-    }
-    ff_r[i] = ffr;
-  }
-  return i;
-}
-
-flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) {
-  if (nullptr == instance || nullptr == flow || nullptr == session)
-    return nullptr;
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto ff = sesh->get();
-  execution_plan->setNextFlowFile(ff);
-  if (ff == nullptr) {
-    return nullptr;
-  }
-  auto claim = ff->getResourceClaim();
-
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->attributes = ff->getAttributesPtr();
-    ffr->ffp = ff.get();
-    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
-    return nullptr;
-  }
-}
-
-int transfer(processor_session* session, flow *flow, const char *rel) {
-  if (nullptr == session || nullptr == flow || rel == nullptr) {
-    return -1;
-  }
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  if (nullptr == sesh || nullptr == execution_plan) {
-    return -1;
-  }
-  core::Relationship relationship(rel, rel);
-  auto ff = execution_plan->getNextFlowFile();
-  if (nullptr == ff) {
-    return -2;
-  }
-  sesh->transfer(ff, relationship);
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 47c7ba6..674566b 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -17,7 +17,7 @@
  */
 
 #include "core/repository/VolatileContentRepository.h"
-#include "capi/expect.h"
+#include "core/expect.h"
 #include <cstdio>
 #include <string>
 #include <memory>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/processors/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp
deleted file mode 100644
index 5524d92..0000000
--- a/libminifi/src/processors/CallbackProcessor.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/CallbackProcessor.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  if (callback_ != nullptr) {
-    processor_session sesh;
-    sesh.session = session;
-    callback_(&sesh);
-  }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
deleted file mode 100644
index b7bf784..0000000
--- a/libminifi/test/capi/CAPITests.cpp
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <sys/stat.h>
-#include <utility>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <fstream>
-
-#include "utils/file/FileUtils.h"
-#include "../TestBase.h"
-
-#include "capi/api.h"
-
-#include <chrono>
-#include <thread>
-
-static nifi_instance *create_instance_obj(const char *name = "random_instance") {
-  nifi_port port;
-  char port_str[] = "12345";
-  port.port_id = port_str;
-  return create_instance("random_instance", &port);
-}
-
-static int failure_count = 0;
-
-void failure_counter(flow_file_record * fr) {
-  failure_count++;
-  REQUIRE(get_attribute_qty(fr) > 0);
-  free_flowfile(fr);
-}
-
-void big_failure_counter(flow_file_record * fr) {
-  failure_count += 100;
-  free_flowfile(fr);
-}
-
-TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
-  REQUIRE(test_proc != nullptr);
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  processor *test_proc = add_processor(test_flow, "NeverExisted");
-  REQUIRE(test_proc == nullptr);
-  processor *no_proc = add_processor(test_flow, "");
-  REQUIRE(no_proc == nullptr);
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
-  REQUIRE(test_proc != nullptr);
-  REQUIRE(set_property(test_proc, "Data Format", "Text") == 0);  // Valid value
-  // TODO(aboda): add this two below when property handling is made strictly typed
-  // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
-  // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
-  REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0);  // Empty value
-  REQUIRE(set_property(test_proc, nullptr, "Blah") != 0);  // Empty attribute
-  REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0);  // Invalid processor
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("get file and put file", "[getAndPutFile]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  char put_format[] = "/tmp/pt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-  const char *putfiledir = testController.createTempDirectory(put_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
-  REQUIRE(put_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-  flow_file_record *record = get_next_flow_file(instance, test_flow);
-  REQUIRE(record != nullptr);
-
-  ss.str("");
-
-  ss << putfiledir << "/" << "tstFile.ext";
-  std::ifstream t(ss.str());
-  std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
-
-  REQUIRE(test_file_content == put_data);
-
-  // No failure handler can be added after the flow is finalized
-  REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
-
-  free_flowfile(record);
-
-  free_flow(test_flow);
-
-  free_instance(instance);
-}
-
-TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
-  REQUIRE(extract_test != nullptr);
-  REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
-  processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
-  REQUIRE(update_attr != nullptr);
-
-  REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
-
-  flow_file_record *record = get_next_flow_file(instance, test_flow);
-
-  REQUIRE(record != nullptr);
-
-  attribute test_attr;
-  test_attr.key = "TestAttr";
-  REQUIRE(get_attribute(record, &test_attr) == 0);
-
-  REQUIRE(test_attr.value_size != 0);
-  REQUIRE(test_attr.value != nullptr);
-
-  std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
-
-  REQUIRE(attr_value == test_file_content);
-
-  const char * new_testattr_value = "S0me t3st t3xt";
-
-  // Attribute already exist, should fail
-  REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0);  // NOLINT
-
-  // Update overwrites values
-  update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value));  // NOLINT
-
-  int attr_size = get_attribute_qty(record);
-  REQUIRE(attr_size > 0);
-
-  attribute_set attr_set;
-  attr_set.size = attr_size;
-  attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute));  // NOLINT
-
-  REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
-
-  bool test_attr_found = false;
-  bool updated_attr_found = false;
-  for (int i = 0; i < attr_set.size; ++i) {
-    if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
-      test_attr_found = true;
-      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
-    } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
-      updated_attr_found = true;
-      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
-    }
-  }
-  REQUIRE(updated_attr_found == true);
-  REQUIRE(test_attr_found == true);
-
-  free_flowfile(record);
-
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Test error handling callback", "[errorHandling]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-
-  // Failure strategy cannot be set before a valid callback is added
-  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
-  REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
-
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
-  REQUIRE(put_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
-  REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-
-  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-
-  REQUIRE(failure_count == 1);
-
-  // Failure handler function can be replaced runtime
-  REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
-  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
-
-  // Create new testfile to trigger failure again
-  ss << "2";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-  REQUIRE(failure_count > 100);
-
-  failure_count = 0;
-
-  free_flow(test_flow);
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
new file mode 100644
index 0000000..bd2d952
--- /dev/null
+++ b/nanofi/CMakeLists.txt
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(include)
+include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+
+if(WIN32)
+include_directories(../libminifi/opsys/win)
+else()
+include_directories(../libminifi/opsys/posix)
+endif()
+
+file(GLOB NANOFI_SOURCES  "src/api/*.cpp" "src/cxx/*.cpp" )
+
+file(GLOB NANOFI_EXAMPLES_SOURCES  "examples/*.c" )
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+	    if (_cpp_latest_flag_supported)
+	        add_compile_options("/std:c++14")
+	    endif()
+	endif()
+else()
+
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+add_library(nanofi STATIC ${NANOFI_SOURCES})
+
+if (APPLE)
+	target_link_libraries (nanofi -Wl,-all_load core-minifi minifi)
+elseif(NOT WIN32)
+	target_link_libraries (nanofi -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive)
+else()
+    set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi")
+	set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi")
+endif ()
+
+if(WIN32)
+	set_target_properties(nanofi PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+if (ENABLE_PYTHON)
+
+add_library(nanofi-shared SHARED ${NANOFI_SOURCES})
+
+if (APPLE)
+	target_link_libraries (nanofi-shared -Wl,-all_load core-minifi-shared minifi-shared)
+elseif(NOT WIN32)
+	target_link_libraries (nanofi-shared -Wl,--whole-archive core-minifi-shared minifi-shared -Wl,--no-whole-archive)
+else()
+    set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi-shared")
+	set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi-shared")
+endif ()
+
+
+if(WIN32)
+	set_target_properties(nanofi-shared PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+set_property(TARGET nanofi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+endif(ENABLE_PYTHON)
+
+if (NOT DISABLE_CURL)
+add_subdirectory(examples)
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
new file mode 100644
index 0000000..f5f660f
--- /dev/null
+++ b/nanofi/examples/CMakeLists.txt
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+	    if (_cpp_latest_flag_supported)
+	        add_compile_options("/std:c++14")
+	    endif()
+	endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (WIN32)
+    set(LINK_FLAGS "/WHOLEARCHIVE")
+    set(LINK_END_FLAGS "")
+elseif (APPLE)
+    set(LINK_FLAGS "-Wl,-all_load")
+    set(LINK_END_FLAGS "")
+else ()
+    set(LINK_FLAGS "-Wl,--whole-archive")
+    set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
+add_executable(generate_flow generate_flow.c)
+
+add_executable(terminate_handler terminate_handler.c)
+
+target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
+if (NOT WIN32)
+
+add_executable(transmit_flow transmit_flow.c)
+
+target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(monitor_directory monitor_directory.c)
+
+target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/generate_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/generate_flow.c b/nanofi/examples/generate_flow.c
new file mode 100644
index 0000000..707de11
--- /dev/null
+++ b/nanofi/examples/generate_flow.c
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 3) {
+    printf("Error: must run ./generate_flow <instance> <remote port> \n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+  flow_file_record *record = get_next_flow_file(instance, new_flow);
+
+  if (record == 0) {
+    printf("Could not create flow file");
+    exit(1);
+  }
+
+  transmit_flowfile(record, instance);
+
+  free_flowfile(record);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  //transfer_file_or_directory(instance,file);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/monitor_directory.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/monitor_directory.c b/nanofi/examples/monitor_directory.c
new file mode 100644
index 0000000..2283b35
--- /dev/null
+++ b/nanofi/examples/monitor_directory.c
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <pthread.h>
+
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+pthread_mutex_t mutex;
+pthread_cond_t condition;
+
+int stopped;
+
+int stop_callback(char *c) {
+  pthread_mutex_lock(&mutex);
+  stopped = 1;
+  pthread_cond_signal(&condition);
+  pthread_mutex_unlock(&mutex);
+  return 0;
+}
+
+int is_stopped(void *ptr) {
+  int is_stop = 0;
+  pthread_mutex_lock(&mutex);
+  is_stop = stopped;
+  pthread_mutex_unlock(&mutex);
+  return is_stop;
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+  if (argc < 5) {
+    printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
+    exit(1);
+  }
+
+  stopped = 0x00;
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *directory = argv[3];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  C2_Server server;
+  server.url = argv[4];
+  server.ack_url = argv[5];
+  server.identifier = "monitor_directory";
+  server.type = REST;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
+
+  flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
+
+  transmit_to_nifi(instance, new_flow, is_stopped);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/terminate_handler.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c
new file mode 100644
index 0000000..d5443f0
--- /dev/null
+++ b/nanofi/examples/terminate_handler.c
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that registers terminate handler and generates an exception.
+ */
+
+void example_terminate_handler() {
+  fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
+  exit(0);
+}
+
+int main(int argc, char **argv) {
+
+  nifi_port port;
+
+  port.port_id = "12345";
+
+  set_terminate_callback(example_terminate_handler);
+
+  nifi_instance *instance = create_instance("random instance", &port);
+
+  flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+  processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
+
+  // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
+  // As it doesn't exist, an exception is thrown
+  set_property(put_proc, "Directory", "/tmp/never_existed");
+  set_property(put_proc, "Create Missing Directories", "false");
+
+  flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+  // Here be dragons - nothing below this line gets executed
+  fprintf(stderr, "Dragons!!!");
+  free_flowfile(record);
+  free_flow(new_flow);
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/transmit_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
new file mode 100644
index 0000000..e132c8b
--- /dev/null
+++ b/nanofi/examples/transmit_flow.c
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "api/nanofi.h"
+
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+  int size = 1;
+
+  if (is_dir(file_or_dir)) {
+    DIR *d;
+
+    struct dirent *dir;
+    d = opendir(file_or_dir);
+    if (d) {
+      while ((dir = readdir(d)) != NULL) {
+        if (!memcmp(dir->d_name,".",1) )
+          continue;
+        char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+        sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+        transfer_file_or_directory(instance,file_path);
+        free(file_path);
+      }
+      closedir(d);
+    }
+    printf("%s is a directory", file_or_dir);
+  } else {
+    printf("Transferring %s\n",file_or_dir);
+
+    flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
+
+    add_attribute(record, "addedattribute", "1", 2);
+
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 4) {
+    printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *file = argv[3];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  transfer_file_or_directory(instance,file);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/api/nanofi.h
----------------------------------------------------------------------
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
new file mode 100644
index 0000000..31a4829
--- /dev/null
+++ b/nanofi/include/api/nanofi.h
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include "core/cstructs.h"
+#include "core/processors.h"
+
+int initialize_api();
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Updates with every release. Functions used here constitute the public API of NanoFi.
+ *
+ * Changes here will follow semver
+ */
+#define API_VERSION "0.02"
+
+void enable_logging();
+
+void set_terminate_callback(void (*terminate_callback)());
+
+/****
+ * ##################################################################
+ *  BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+nifi_instance *create_instance(const char *url, nifi_port *port);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+/****
+ * ##################################################################
+ *  C2 OPERATIONS
+ * ##################################################################
+ */
+
+
+typedef int c2_update_callback(char *);
+
+typedef int c2_stop_callback(char *);
+
+typedef int c2_start_callback(char *);
+
+void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
+
+
+uint8_t run_processor(const processor *processor);
+
+flow *create_new_flow(nifi_instance *);
+
+flow *create_flow(nifi_instance *, const char *);
+
+flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+
+processor *add_processor(flow *, const char *);
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name);
+
+processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file ownership is transferred to the caller!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
+
+int set_property(processor *, const char *, const char *);
+
+int set_instance_property(nifi_instance *instance, const char*, const char *);
+
+int free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
+
+flow_file_record *get(nifi_instance *,flow *, processor_session *);
+
+int transfer(processor_session* session, flow *flow, const char *rel);
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len);
+
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
+
+int get_attribute_qty(const flow_file_record* ff);
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ *  Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+int transmit_flowfile(flow_file_record *, nifi_instance *);
+
+/****
+ * ##################################################################
+ *  Persistence Operations
+ * ##################################################################
+ */
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/comms.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/comms.h b/nanofi/include/blocks/comms.h
new file mode 100644
index 0000000..bfacc3d
--- /dev/null
+++ b/nanofi/include/blocks/comms.h
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_COMMS_H_
+#define BLOCKS_COMMS_H_
+
+#include <stdio.h>
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define SUCCESS 0x00
+#define FINISHED_EARLY 0x01
+#define FAIL 0x02
+
+typedef int transmission_stop(void *);
+
+uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
+
+  flow_file_record *record = 0x00;
+  do {
+    record = get_next_flow_file(instance, flow);
+
+    if (record == 0) {
+      return FINISHED_EARLY;
+    }
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  } while (record != 0x00 && !(stop_callback != 0x00 && stop_callback(0x00)));
+  return SUCCESS;
+}
+
+#endif /* BLOCKS_COMMS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/file_blocks.h b/nanofi/include/blocks/file_blocks.h
new file mode 100644
index 0000000..d84ccbe
--- /dev/null
+++ b/nanofi/include/blocks/file_blocks.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_FILE_BLOCKS_H_
+#define BLOCKS_FILE_BLOCKS_H_
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define KEEP_SOURCE 0x01
+#define RECURSE 0x02
+
+/**
+ * Monitor directory can be combined into a current flow. to create an execution plan
+ */
+flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
+  GetFileConfig config;
+  config.directory = directory;
+  config.keep_source = flags & KEEP_SOURCE;
+  config.recurse = flags & RECURSE;
+  return create_getfile(instance, parent_flow, &config);
+}
+
+#endif /* BLOCKS_FILE_BLOCKS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/cstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
new file mode 100644
index 0000000..bc9700e
--- /dev/null
+++ b/nanofi/include/core/cstructs.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+  char *port_id;
+} nifi_port;
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+  void *instance_ptr;
+
+  nifi_port port;
+
+} nifi_instance;
+
+/****
+ * ##################################################################
+ *  C2 OPERATIONS
+ * ##################################################################
+ */
+
+enum C2_Server_Type {
+  REST,
+  MQTT
+};
+
+typedef struct {
+  char *url;
+  char *ack_url;
+  char *identifier;
+  char *topic;
+  enum C2_Server_Type type;
+} C2_Server;
+
+/****
+ * ##################################################################
+ *  Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+  void *processor_ptr;
+} processor;
+
+typedef struct {
+  void *session;
+} processor_session;
+
+/****
+ * ##################################################################
+ *  FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+  const char *key;
+  void *value;
+  size_t value_size;
+} attribute;
+
+typedef struct {
+  attribute * attributes;
+  size_t size;
+} attribute_set;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+  uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+  void * in;
+
+  void * crp;
+
+  char * contentLocation; /**< Filesystem location of this object */
+
+  void *attributes; /**< Hash map of attributes */
+
+  void *ffp;
+
+} flow_file_record;
+
+typedef struct {
+  void *plan;
+} flow;
+
+typedef enum FS {
+  AS_IS,
+  ROLLBACK
+} FailureStrategy;
+
+#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/processors.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/processors.h b/nanofi/include/core/processors.h
new file mode 100644
index 0000000..7fe357d
--- /dev/null
+++ b/nanofi/include/core/processors.h
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+  char *directory;
+  unsigned keep_source :1;
+  unsigned recurse :1;
+} GetFileConfig;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h
new file mode 100644
index 0000000..f620baa
--- /dev/null
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "c2/C2Agent.h"
+#include "core/state/Value.h"
+#include "c2/C2Payload.h"
+#include "c2/C2Protocol.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+typedef int c2_ag_update_callback(char *);
+
+typedef int c2_ag_stop_callback(char *);
+
+typedef int c2_ag_start_callback(char *);
+
+class C2CallbackAgent : public c2::C2Agent {
+
+ public:
+
+  explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+  virtual ~C2CallbackAgent() {
+  }
+
+  void setStopCallback(c2_ag_stop_callback *st){
+    stop = st;
+  }
+
+
+ protected:
+  /**
+     * Handles a C2 event requested by the server.
+     * @param resp c2 server response.
+     */
+    virtual void handle_c2_server_response(const C2ContentResponse &resp);
+
+    c2_ag_stop_callback *stop;
+
+ private:
+    std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h
new file mode 100644
index 0000000..61e824f
--- /dev/null
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -0,0 +1,100 @@
+/**
+ * @file CallbackProcessor.h
+ * CallbackProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CALLBACK_PROCESSOR_H__
+#define __CALLBACK_PROCESSOR_H__
+
+#include <stdio.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <iostream>
+#include <sys/types.h>
+#include "core/cstructs.h"
+#include "io/BaseStream.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// CallbackProcessor Class
+class CallbackProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
+      : Processor(name, uuid),
+        callback_(nullptr),
+        objref_(nullptr),
+        logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
+  }
+  // Destructor
+  virtual ~CallbackProcessor() {
+
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "CallbackProcessor";
+
+ public:
+
+  void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
+    objref_ = obj;
+    callback_ = ontrigger_callback;
+  }
+
+  // OnTrigger method, implemented by NiFi CallbackProcessor
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  // Initialize, over write by NiFi CallbackProcessor
+  virtual void initialize() {
+    std::set<core::Relationship> relationships;
+    core::Relationship Success("success", "description");
+    relationships.insert(Success);
+    setSupportedRelationships(relationships);
+  }
+
+ protected:
+  void *objref_;
+  std::function<void(processor_session*)> callback_;
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+REGISTER_RESOURCE(CallbackProcessor);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Instance.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
new file mode 100644
index 0000000..982f6d4
--- /dev/null
+++ b/nanofi/include/cxx/Instance.h
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "C2CallbackAgent.h"
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+#include "utils/ThreadPool.h"
+#include "core/state/UpdateController.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+  explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+      : processor_(processor) {
+
+  }
+
+  const std::shared_ptr<core::Processor> &getProcessor() {
+    return processor_;
+  }
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+  explicit Instance(const std::string &url, const std::string &port)
+      : configure_(std::make_shared<Configure>()),
+        url_(url),
+        agent_(nullptr),
+        rpgInitialized_(false),
+        listener_thread_pool_(1),
+        content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+        no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+    running_ = false;
+    stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
+    utils::Identifier uuid;
+    uuid = port;
+    rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+    proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+    core::FlowConfiguration::initialize_static_functions();
+    content_repo_->initialize(configure_);
+  }
+
+  ~Instance() {
+    running_ = false;
+    listener_thread_pool_.shutdown();
+  }
+
+  bool isRPGConfigured() {
+    return rpgInitialized_;
+  }
+
+  void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+    running_ = true;
+    if (server->type != C2_Server_Type::MQTT){
+      configure_->set("c2.rest.url",server->url);
+      configure_->set("c2.rest.url.ack",server->ack_url);
+    }
+    agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
+    listener_thread_pool_.start();
+    registerUpdateListener(agent_, 1000);
+    agent_->setStopCallback(c1);
+  }
+
+  void setRemotePort(std::string remote_port) {
+    rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+    rpg_->initialize();
+    rpg_->setTransmitting(true);
+    rpgInitialized_ = true;
+  }
+
+  std::shared_ptr<Configure> getConfiguration() {
+    return configure_;
+  }
+
+  std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+    return no_op_repo_;
+  }
+
+  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+    return content_repo_;
+  }
+
+  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+    auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+    rpg_->onSchedule(processContext, sessionFactory);
+
+    auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+    session->add(ff);
+
+    rpg_->onTrigger(processContext, session);
+  }
+
+ protected:
+
+  bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
+    auto functions = updateController->getFunctions();
+    // run all functions independently
+
+    for (auto function : functions) {
+      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
+      utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
+      std::future<state::Update> future;
+      if (!listener_thread_pool_.execute(std::move(functor), future)) {
+        // denote failure
+        return false;
+      }
+    }
+    return true;
+  }
+
+  std::shared_ptr<c2::C2CallbackAgent> agent_;
+
+  std::atomic<bool> running_;
+
+  bool rpgInitialized_;
+
+  std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+  std::shared_ptr<core::ProcessorNode> proc_node_;
+  std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+  std::shared_ptr<io::StreamFactory> stream_factory_;
+  std::string url_;
+  std::shared_ptr<Configure> configure_;
+
+  utils::ThreadPool<state::Update> listener_thread_pool_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Plan.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
new file mode 100644
index 0000000..6171242
--- /dev/null
+++ b/nanofi/include/cxx/Plan.h
@@ -0,0 +1,204 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+
+#ifndef WIN32
+	#include <dirent.h>
+#endif
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "core/cstructs.h"
+#include "api/nanofi.h"
+
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+  void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    auto ff = session->get();
+    if (ff == nullptr) {
+      return;
+    }
+
+    auto claim = ff->getResourceClaim();
+
+    if (claim != nullptr && user_callback != nullptr) {
+      claim->increaseFlowFileRecordOwnedCount();
+      // create a flow file.
+      auto path = claim->getContentFullPath();
+      auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+      ffr->attributes = ff->getAttributesPtr();
+      ffr->ffp = ff.get();
+      auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+      *content_repo_ptr = cr_ptr;
+      user_callback(ffr);
+    }
+    session->remove(ff);
+  }
+
+  void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    session->rollback();
+    failureStrategyAsIs(session, user_callback, cr_ptr);
+  }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+    { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
+
+class ExecutionPlan {
+ public:
+
+  explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+  std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+                                                core::Relationship relationship = core::Relationship("success", "description"),
+                                                bool linkToPrevious = false);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+  bool linkToPrevious = false);
+
+  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+  void reset();
+
+  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+  bool setFailureCallback(failure_callback_type onerror_callback);
+
+  bool setFailureStrategy(FailureStrategy start);
+
+  std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+  std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+  std::shared_ptr<core::ProcessSession> getCurrentSession();
+
+  std::shared_ptr<core::Repository> getFlowRepo() {
+    return flow_repo_;
+  }
+
+  std::shared_ptr<core::Repository> getProvenanceRepo() {
+    return prov_repo_;
+  }
+
+  std::shared_ptr<core::ContentRepository> getContentRepo() {
+    return content_repo_;
+  }
+
+  std::shared_ptr<core::FlowFile> getNextFlowFile(){
+    return next_ff_;
+  }
+
+  void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
+    next_ff_ = ptr;
+  }
+
+  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
+ protected:
+  class FailureHandler {
+   public:
+    FailureHandler(content_repo_sptr cr_ptr) {
+      callback_ = nullptr;
+      strategy_ = FailureStrategy::AS_IS;
+      content_repo_ = cr_ptr;
+    }
+    void setCallback(failure_callback_type onerror_callback) {
+      callback_=onerror_callback;
+    }
+    void setStrategy(FailureStrategy strat) {
+      strategy_ = strat;
+    }
+    void operator()(const processor_session* ps) {
+      auto ses = static_cast<core::ProcessSession*>(ps->session);
+      FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
+    }
+   private:
+    failure_callback_type callback_;
+    FailureStrategy strategy_;
+    content_repo_sptr content_repo_;
+  };
+
+  void finalize();
+
+  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+  std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+                                                        core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
+
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+  content_repo_sptr content_repo_;
+
+  std::shared_ptr<core::Repository> flow_repo_;
+  std::shared_ptr<core::Repository> prov_repo_;
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+  std::atomic<bool> finalized;
+
+  uint32_t location;
+
+  std::shared_ptr<core::ProcessSession> current_session_;
+  std::shared_ptr<core::FlowFile> current_flowfile_;
+
+  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+  core::Relationship termination_;
+
+  std::shared_ptr<core::FlowFile> next_ff_;
+
+ private:
+
+  static std::shared_ptr<utils::IdGenerator> id_generator_;
+  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<FailureHandler> failure_handler_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/nanofi/include/cxx/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+  // Constructor
+  /*!
+   * Create a new process session
+   */
+  ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+      : ProcessSession(processContext){
+  }
+
+// Destructor
+  virtual ~ReflexiveSession() {
+  }
+
+   virtual std::shared_ptr<core::FlowFile> get(){
+     auto prevff = ff;
+     ff = nullptr;
+     return prevff;
+   }
+
+   virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+     ff = flow;
+   }
+   virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+     // no op
+   }
+ protected:
+  //
+  // Get the FlowFile from the highest priority queue
+  std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif


[3/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into nanofi

Posted by al...@apache.org.
MINIFICPP-659: Break out CAPI into nanofi

Break out structure -- facilitating Python and new nanofi structure

MINIFICPP-659: Add http-curl to examples and comment for why rocksdb is disabling shared libs

MINIFICPP-659: limit python on apple

MINIFICPP-663: Ensure bison is 3.0.4

This closes #433.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/556794b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/556794b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/556794b1

Branch: refs/heads/master
Commit: 556794b15f1f342bc4b51998c55811c805be91b5
Parents: fc1074a
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Oct 29 14:53:37 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Oct 31 14:27:11 2018 -0400

----------------------------------------------------------------------
 .travis.yml                                     |   3 -
 CMakeLists.txt                                  |  10 +-
 LibExample/CMakeLists.txt                       |  78 ---
 LibExample/generate_flow.c                      |  64 ---
 LibExample/monitor_directory.c                  |  95 ----
 LibExample/terminate_handler.c                  |  58 ---
 LibExample/transmit_flow.c                      |  92 ----
 README.md                                       |   6 +-
 blocks/comms.h                                  |  47 --
 blocks/file_blocks.h                            |  38 --
 cmake/BuildTests.cmake                          |  36 +-
 darwin.sh                                       |  27 +-
 libminifi/CMakeLists.txt                        |   6 -
 libminifi/include/agent/build_description.h     |   2 +-
 libminifi/include/capi/C2CallbackAgent.h        |  82 ---
 libminifi/include/capi/Instance.h               | 180 -------
 libminifi/include/capi/Plan.h                   | 203 --------
 libminifi/include/capi/ReflexiveSession.h       |  77 ---
 libminifi/include/capi/api.h                    | 157 ------
 libminifi/include/capi/cstructs.h               | 127 -----
 libminifi/include/capi/expect.h                 |  32 --
 libminifi/include/capi/processors.h             |  37 --
 libminifi/include/core/expect.h                 |  32 ++
 libminifi/include/io/tls/TLSSocket.h            |   2 +-
 .../include/processors/CallbackProcessor.h      | 100 ----
 libminifi/include/utils/ThreadPool.h            |   2 +-
 libminifi/src/capi/C2CallbackAgent.cpp          |  80 ---
 libminifi/src/capi/Plan.cpp                     | 294 -----------
 libminifi/src/capi/api.cpp                      | 517 ------------------
 .../repository/VolatileContentRepository.cpp    |   2 +-
 libminifi/src/processors/CallbackProcessor.cpp  |  37 --
 libminifi/test/capi/CAPITests.cpp               | 279 ----------
 nanofi/CMakeLists.txt                           | 100 ++++
 nanofi/examples/CMakeLists.txt                  |  78 +++
 nanofi/examples/generate_flow.c                 |  65 +++
 nanofi/examples/monitor_directory.c             |  95 ++++
 nanofi/examples/terminate_handler.c             |  59 +++
 nanofi/examples/transmit_flow.c                 |  93 ++++
 nanofi/include/api/nanofi.h                     | 159 ++++++
 nanofi/include/blocks/comms.h                   |  48 ++
 nanofi/include/blocks/file_blocks.h             |  38 ++
 nanofi/include/core/cstructs.h                  | 118 +++++
 nanofi/include/core/processors.h                |  37 ++
 nanofi/include/cxx/C2CallbackAgent.h            |  81 +++
 nanofi/include/cxx/CallbackProcessor.h          | 100 ++++
 nanofi/include/cxx/Instance.h                   | 180 +++++++
 nanofi/include/cxx/Plan.h                       | 204 ++++++++
 nanofi/include/cxx/ReflexiveSession.h           |  77 +++
 nanofi/src/api/nanofi.cpp                       | 518 +++++++++++++++++++
 nanofi/src/cxx/C2CallbackAgent.cpp              |  79 +++
 nanofi/src/cxx/CallbackProcessor.cpp            |  37 ++
 nanofi/src/cxx/Plan.cpp                         | 294 +++++++++++
 nanofi/tests/CAPITests.cpp                      | 278 ++++++++++
 python/library/CMakeLists.txt                   |   6 +-
 python/library/python_lib.cpp                   |   9 +-
 thirdparty/rocksdb/CMakeLists.txt               |  22 +-
 .../yaml-cpp-yaml-cpp-20171024/CMakeLists.txt   |   2 +-
 57 files changed, 2854 insertions(+), 2725 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 2138acd..45ea54e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -70,7 +70,6 @@ matrix:
         - package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
-        - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
         - package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
         - package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
@@ -92,7 +91,6 @@ matrix:
         - package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
-        - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
         - package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
         - package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
@@ -114,7 +112,6 @@ matrix:
         - package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
-        - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
         - package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
         - package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
         - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1c92446..7c81edb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -30,6 +30,7 @@ option(OPENSSL_OFF "Disables OpenSSL" OFF)
 option(ENABLE_OPS "Enable Operations Tools" ON)
 option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF)
 option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON)
+option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
 if (WIN32)
 option(USE_SYSTEM_ZLIB "Instructs the build system to search for and use a zlib library available in the host system" OFF)
 else()
@@ -324,8 +325,12 @@ set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
 set(CIVETWEB_ENABLE_SSL OFF CACHE BOOL "DISABLE SSL")
 
 SET(WITH_TOOLS OFF CACHE BOOL "Do not build RocksDB tools")
+if ( NOT APPLE)
 if (ENABLE_PYTHON)
-SET(BUILD_SHARED_LIBS ON CACHE BOOL "build yaml cpp shared lib")
+  SET(BUILD_SHARED_LIBS ON CACHE BOOL "Build yaml cpp shared lib" FORCE)
+else()
+  SET(BUILD_SHARED_LIBS OFF CACHE BOOL "Build yaml cpp shared lib" FORCE)
+endif()
 endif()
 SET(WITH_TESTS OFF CACHE BOOL "Build RocksDB library (not repo) tests")
 set(CIVET_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb root")
@@ -471,8 +476,9 @@ endif()
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 
 add_subdirectory(main)
+add_subdirectory(nanofi)
+
 if (NOT DISABLE_CURL)
-  add_subdirectory(LibExample)
   if (ENABLE_PYTHON)
   	if (NOT WIN32)
   		add_subdirectory(python/library)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt
deleted file mode 100644
index 88901cc..0000000
--- a/LibExample/CMakeLists.txt
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
-
-include_directories(../blocks/ ../libminifi/include ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/)
-
-include(CheckCXXCompilerFlag)
-if (WIN32)
-  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
-	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
-	    if (_cpp_latest_flag_supported)
-	        add_compile_options("/std:c++14")
-	    endif()
-	endif()
-else()
-CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
-CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
-if(COMPILER_SUPPORTS_CXX11)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
-elseif(COMPILER_SUPPORTS_CXX0X)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
-else()
- message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
-endif()
-
-endif()
-
-if (WIN32)
-    set(LINK_FLAGS "/WHOLEARCHIVE")
-    set(LINK_END_FLAGS "")
-elseif (APPLE)
-    set(LINK_FLAGS "-Wl,-all_load")
-    set(LINK_END_FLAGS "")
-else ()
-    set(LINK_FLAGS "-Wl,--whole-archive")
-    set(LINK_END_FLAGS "-Wl,--no-whole-archive")
-endif ()
-
-add_executable(generate_flow generate_flow.c)
-
-add_executable(terminate_handler terminate_handler.c)
-
-target_link_libraries(generate_flow capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-target_link_libraries(terminate_handler capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-if (NOT WIN32)
-
-add_executable(transmit_flow transmit_flow.c)
-
-target_link_libraries(transmit_flow capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-add_executable(monitor_directory monitor_directory.c)
-
-target_link_libraries(monitor_directory capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-endif()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/generate_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/generate_flow.c b/LibExample/generate_flow.c
deleted file mode 100644
index eb5e7a3..0000000
--- a/LibExample/generate_flow.c
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include "capi/api.h"
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
-
-  if (argc < 3) {
-    printf("Error: must run ./generate_flow <instance> <remote port> \n");
-    exit(1);
-  }
-
-  char *instance_str = argv[1];
-  char *portStr = argv[2];
-
-  nifi_port port;
-
-  port.port_id = portStr;
-
-  nifi_instance *instance = create_instance(instance_str, &port);
-
-  flow *new_flow = create_flow(instance, "GenerateFlowFile");
-
-  flow_file_record *record = get_next_flow_file(instance, new_flow );
-
-  if (record == 0){
-    printf("Could not create flow file");
-    exit(1);
-  }
-
-  transmit_flowfile(record,instance);
-
-  free_flowfile(record);
-
-  // initializing will make the transmission slightly more efficient.
-  //initialize_instance(instance);
-  //transfer_file_or_directory(instance,file);
-
-  free_flow(new_flow);
-
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/monitor_directory.c
----------------------------------------------------------------------
diff --git a/LibExample/monitor_directory.c b/LibExample/monitor_directory.c
deleted file mode 100644
index b456fcd..0000000
--- a/LibExample/monitor_directory.c
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <dirent.h>
-#include <pthread.h>
-
-#include "file_blocks.h"
-#include "comms.h"
-#include "capi/api.h"
-#include "capi/processors.h"
-int is_dir(const char *path) {
-  struct stat stat_struct;
-  if (stat(path, &stat_struct) != 0)
-    return 0;
-  return S_ISDIR(stat_struct.st_mode);
-}
-
-pthread_mutex_t mutex;
-pthread_cond_t condition;
-
-int stopped;
-
-int stop_callback(char *c) {
-  pthread_mutex_lock(&mutex);
-  stopped = 1;
-  pthread_cond_signal(&condition);
-  pthread_mutex_unlock(&mutex);
-  return 0;
-}
-
-int is_stopped(void *ptr) {
-  int is_stop = 0;
-  pthread_mutex_lock(&mutex);
-  is_stop = stopped;
-  pthread_mutex_unlock(&mutex);
-  return is_stop;
-}
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
-  if (argc < 5) {
-    printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
-    exit(1);
-  }
-
-  stopped = 0x00;
-
-  char *instance_str = argv[1];
-  char *portStr = argv[2];
-  char *directory = argv[3];
-
-  nifi_port port;
-
-  port.port_id = portStr;
-
-  C2_Server server;
-  server.url = argv[4];
-  server.ack_url = argv[5];
-  server.identifier = "monitor_directory";
-  server.type = REST;
-
-  nifi_instance *instance = create_instance(instance_str, &port);
-
-  // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
-
-  flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
-
-  transmit_to_nifi(instance, new_flow, is_stopped);
-
-  free_flow(new_flow);
-
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/terminate_handler.c
----------------------------------------------------------------------
diff --git a/LibExample/terminate_handler.c b/LibExample/terminate_handler.c
deleted file mode 100644
index daa9f3b..0000000
--- a/LibExample/terminate_handler.c
+++ /dev/null
@@ -1,58 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-    *
-    *     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include "capi/api.h"
-
-/**
- * This is an example of the C API that registers terminate handler and generates an exception.
- */
-
-void example_terminate_handler() {
-  fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
-  exit(0);
-}
-
-int main(int argc, char **argv) {
-
-  nifi_port port;
-
-  port.port_id = "12345";
-
-  set_terminate_callback(example_terminate_handler);
-
-  nifi_instance *instance = create_instance("random instance", &port);
-
-  flow *new_flow = create_flow(instance, "GenerateFlowFile");
-
-  processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
-
-  // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
-  // As it doesn't exist, an exception is thrown
-  set_property(put_proc, "Directory", "/tmp/never_existed");
-  set_property(put_proc, "Create Missing Directories", "false");
-
-  flow_file_record *record = get_next_flow_file(instance, new_flow );
-
-  // Here be dragons - nothing below this line gets executed
-  fprintf(stderr, "Dragons!!!");
-  free_flowfile(record);
-  free_flow(new_flow);
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/transmit_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c
deleted file mode 100644
index 7c6f6e7..0000000
--- a/LibExample/transmit_flow.c
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <dirent.h>
-#include "capi/api.h"
-
-int is_dir(const char *path) {
-  struct stat stat_struct;
-  if (stat(path, &stat_struct) != 0)
-    return 0;
-  return S_ISDIR(stat_struct.st_mode);
-}
-
-void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
-  int size = 1;
-
-  if (is_dir(file_or_dir)) {
-    DIR *d;
-
-    struct dirent *dir;
-    d = opendir(file_or_dir);
-    if (d) {
-      while ((dir = readdir(d)) != NULL) {
-        if (!memcmp(dir->d_name,".",1) )
-          continue;
-        char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
-        sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
-        transfer_file_or_directory(instance,file_path);
-        free(file_path);
-      }
-      closedir(d);
-    }
-    printf("%s is a directory", file_or_dir);
-  } else {
-    printf("Transferring %s\n",file_or_dir);
-
-    flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
-
-    add_attribute(record, "addedattribute", "1", 2);
-
-    transmit_flowfile(record, instance);
-
-    free_flowfile(record);
-  }
-}
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
-
-  if (argc < 4) {
-    printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
-    exit(1);
-  }
-
-  char *instance_str = argv[1];
-  char *portStr = argv[2];
-  char *file = argv[3];
-
-  nifi_port port;
-
-  port.port_id = portStr;
-
-  nifi_instance *instance = create_instance(instance_str, &port);
-
-  // initializing will make the transmission slightly more efficient.
-  //initialize_instance(instance);
-  transfer_file_or_directory(instance,file);
-
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 542dd6c..ae37b62 100644
--- a/README.md
+++ b/README.md
@@ -102,7 +102,7 @@ MiNiFi - C++ supports the following processors:
 * g++
   * 4.8.4 or greater
 * bison
-  * 3.0 or greater
+  * 3.0.x (3.2 has been shown to fail builds)
 * flex
   * 2.5 or greater
 
@@ -160,7 +160,7 @@ $ scl enable devtoolset-6 bash
 ```
 
 Additionally, for expression language support, it is recommended to install GNU
-Bison 3.x:
+Bison 3.0.4:
 
 ```
 $ wget https://ftp.gnu.org/gnu/bison/bison-3.0.4.tar.xz
@@ -279,7 +279,6 @@ $ apt-get install libpcap-dev
 ```
 # ~/Development/code/apache/nifi-minifi-cpp on git:master
 $ brew install cmake \
-  bison \
   flex \
   patch \
   autoconf \
@@ -305,6 +304,7 @@ $ sudo pip install virtualenv
 $ brew install gpsd
 $ # (Optional) for PacketCapture Processor
 $ sudo brew install libpcap
+$ # It is recommended that you install bison from source as HomeBrew now uses an incompatible version of Bison
 ```
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/blocks/comms.h
----------------------------------------------------------------------
diff --git a/blocks/comms.h b/blocks/comms.h
deleted file mode 100644
index 3f5fda1..0000000
--- a/blocks/comms.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef BLOCKS_COMMS_H_
-#define BLOCKS_COMMS_H_
-
-#include <stdio.h>
-#include "capi/api.h"
-#include "capi/processors.h"
-
-#define SUCCESS 0x00
-#define FINISHED_EARLY 0x01
-#define FAIL 0x02
-
-typedef int transmission_stop(void *);
-
-uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
-
-  flow_file_record *record = 0x00;
-  do {
-    record = get_next_flow_file(instance, flow);
-
-    if (record == 0) {
-      return FINISHED_EARLY;
-    }
-    transmit_flowfile(record, instance);
-
-    free_flowfile(record);
-  } while (record != 0x00 && !( stop_callback != 0x00 && stop_callback(0x00)));
-  return SUCCESS;
-}
-
-#endif /* BLOCKS_COMMS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/blocks/file_blocks.h b/blocks/file_blocks.h
deleted file mode 100644
index 3f1d6f7..0000000
--- a/blocks/file_blocks.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef BLOCKS_FILE_BLOCKS_H_
-#define BLOCKS_FILE_BLOCKS_H_
-
-#include "capi/api.h"
-#include "capi/processors.h"
-
-#define KEEP_SOURCE 0x01
-#define RECURSE 0x02
-
-/**
- * Monitor directory can be combined into a current flow. to create an execution plan
- */
-flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
-  GetFileConfig config;
-  config.directory = directory;
-  config.keep_source = flags & KEEP_SOURCE;
-  config.recurse = flags & RECURSE;
-  return create_getfile(instance, parent_flow, &config);
-}
-
-#endif /* BLOCKS_FILE_BLOCKS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index d14c19b..c7ec1d6 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -29,12 +29,13 @@ MACRO(GETSOURCEFILES result curdir)
   SET(${result} ${dirlist})
 ENDMACRO()
 
+set(NANOFI_TEST_DIR "${CMAKE_SOURCE_DIR}/nanofi/tests/")
+
 if(NOT EXCLUDE_BOOST)
 	find_package(Boost COMPONENTS system filesystem)
 endif()
 
-function(createTests testName)
-   message ("-- Adding test: ${testName}")
+function(appendIncludes testName)
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
@@ -60,6 +61,11 @@ function(createTests testName)
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/utils")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/processors")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/provenance")
+endfunction()
+
+function(createTests testName)
+    message ("-- Adding test: ${testName}")
+    appendIncludes("${testName}")
 
 	if (ENABLE_BINARY_DIFF)
     	target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/bsdiff/")
@@ -70,11 +76,13 @@ function(createTests testName)
     endif()
     target_link_libraries(${testName} ${CMAKE_DL_LIBS} ${SPD_LIB} ${TEST_BASE_LIB})
     target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp)
-    if (APPLE)
-		target_link_libraries (${testName} -Wl,-all_load minifi)
-	else ()
-		target_link_libraries (${testName} -Wl,--whole-archive minifi -Wl,--no-whole-archive)
-	endif ()
+    if (NOT excludeBase)
+      if (APPLE)
+  		target_link_libraries (${testName} -Wl,-all_load minifi)
+	  else ()
+  		target_link_libraries (${testName} -Wl,--whole-archive minifi -Wl,--no-whole-archive)
+	  endif ()
+	endif()
     if (Boost_FOUND)
         target_link_libraries(${testName} ${Boost_SYSTEM_LIBRARY})
         target_link_libraries(${testName} ${Boost_FILESYSTEM_LIBRARY})
@@ -105,7 +113,7 @@ target_include_directories(${CATCH_MAIN_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}
 SET(TEST_RESOURCES ${TEST_DIR}/resources)
 
 GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/")
-GETSOURCEFILES(CAPI_UNIT_TESTS "${TEST_DIR}/capi/")
+GETSOURCEFILES(NANOFI_UNIT_TESTS "${NANOFI_TEST_DIR}")
 GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
 
 SET(UNIT_TEST_COUNT 0)
@@ -120,15 +128,17 @@ ENDFOREACH()
 message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...")
 
 SET(UNIT_TEST_COUNT 0)
-FOREACH(testfile ${CAPI_UNIT_TESTS})
+FOREACH(testfile ${NANOFI_UNIT_TESTS})
     get_filename_component(testfilename "${testfile}" NAME_WE)
-    add_executable("${testfilename}" "${TEST_DIR}/capi/${testfile}")
-    createTests("${testfilename}")
-    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} capi)
+    add_executable("${testfilename}" "${NANOFI_TEST_DIR}/${testfile}")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/nanofi/include")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
+    appendIncludes("${testfilename}")
+    target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT} ${CATCH_MAIN_LIB} nanofi)
     MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1")
     add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
 ENDFOREACH()
-message("-- Finished building ${UNIT_TEST_COUNT} capi unit test file(s)...")
+message("-- Finished building ${UNIT_TEST_COUNT} NanoFi unit test file(s)...")
 
 SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/darwin.sh
----------------------------------------------------------------------
diff --git a/darwin.sh b/darwin.sh
index 7a4ef90..e02e1eb 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -40,6 +40,30 @@ add_os_flags(){
   :
 }
 
+install_bison() {
+  BISON_INSTALLED="false"
+  if [ -x "$(command -v bison)" ]; then
+    BISON_VERSION=`bison --version | head -n 1 | awk '{print $4}'`
+    BISON_MAJOR=`echo $BISON_VERSION | cut -d. -f1`
+    if (( BISON_MAJOR >= 3 )); then
+      BISON_INSTALLED="true"
+    fi
+  fi
+  if [ "$BISON_INSTALLED" = "false" ]; then
+    ## ensure that the toolchain is installed
+    INSTALL_BASE="sudo zypper in -y gcc gcc-c++"
+    ${INSTALL_BASE}
+    wget https://ftp.gnu.org/gnu/bison/bison-3.0.4.tar.xz
+    tar xvf bison-3.0.4.tar.xz
+    pushd bison-3.0.4
+    ./configure
+    make
+    sudo make install
+    popd
+  fi
+
+}
+
 bootstrap_cmake(){
   brew install cmake
 }
@@ -69,7 +93,7 @@ build_deps(){
           elif [ "$FOUND_VALUE" = "libpng" ]; then
             INSTALLED+=("libpng")
           elif [ "$FOUND_VALUE" = "bison" ]; then
-            INSTALLED+=("bison")
+            install_bison
           elif [ "$FOUND_VALUE" = "flex" ]; then
             INSTALLED+=("flex")
           elif [ "$FOUND_VALUE" = "python" ]; then
@@ -101,5 +125,4 @@ build_deps(){
       brew link curl --force > /dev/null 2>&1
     fi
   done
-
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 9afcd6b..f6cc302 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -101,7 +101,6 @@ file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/s
 
 file(GLOB PROCESSOR_SOURCES  "src/processors/*.cpp" )
 
-file(GLOB CAPI_SOURCES  "src/capi/*.cpp" )
 
 file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
 
@@ -130,7 +129,6 @@ if (OPENSSL_FOUND)
 endif (OPENSSL_FOUND)
 
 add_library(minifi STATIC ${PROCESSOR_SOURCES})
-add_library(capi STATIC ${CAPI_SOURCES})
 
 
 target_link_libraries(minifi core-minifi)
@@ -167,8 +165,6 @@ if (OPENSSL_FOUND)
 endif (OPENSSL_FOUND)
 
 add_library(minifi-shared SHARED ${PROCESSOR_SOURCES})
-add_library(capi-shared SHARED ${CAPI_SOURCES})
-
 
 target_link_libraries(minifi-shared core-minifi-shared)
 if (WIN32)
@@ -177,9 +173,7 @@ set_target_properties(minifi-shared PROPERTIES LINK_FLAGS "${LINK_FLAGS} /WHOLEA
 endif()
 
 
-
 set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
 set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
-set_property(TARGET capi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
 endif()
 endif(ENABLE_PYTHON)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/agent/build_description.h
----------------------------------------------------------------------
diff --git a/libminifi/include/agent/build_description.h b/libminifi/include/agent/build_description.h
index 5359595..4d6a1e6 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -19,7 +19,7 @@
 #define BUILD_DESCRPTION_H
 
 #include <vector>
-#include "capi/expect.h"
+#include "core/expect.h"
 
 namespace org {
 namespace apache {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/C2CallbackAgent.h b/libminifi/include/capi/C2CallbackAgent.h
deleted file mode 100644
index c7ee5a9..0000000
--- a/libminifi/include/capi/C2CallbackAgent.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
-#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
-
-#include <utility>
-#include <functional>
-#include <future>
-#include <memory>
-#include <mutex>
-#include <thread>
-
-#include "c2/C2Agent.h"
-#include "core/state/UpdateController.h"
-#include "core/state/Value.h"
-#include "c2/C2Payload.h"
-#include "c2/C2Protocol.h"
-#include "io/validation.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-typedef int c2_ag_update_callback(char *);
-
-typedef int c2_ag_stop_callback(char *);
-
-typedef int c2_ag_start_callback(char *);
-
-class C2CallbackAgent : public c2::C2Agent {
-
- public:
-
-  explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
-
-  virtual ~C2CallbackAgent() {
-  }
-
-  void setStopCallback(c2_ag_stop_callback *st){
-    stop = st;
-  }
-
-
- protected:
-  /**
-     * Handles a C2 event requested by the server.
-     * @param resp c2 server response.
-     */
-    virtual void handle_c2_server_response(const C2ContentResponse &resp);
-
-    c2_ag_stop_callback *stop;
-
- private:
-    std::shared_ptr<logging::Logger> logger_;
-
-};
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-
-#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h
deleted file mode 100644
index 982f6d4..0000000
--- a/libminifi/include/capi/Instance.h
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
-#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
-
-#include <memory>
-#include <type_traits>
-#include <string>
-#include "core/Property.h"
-#include "properties/Configure.h"
-#include "io/StreamFactory.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ContentRepository.h"
-#include "core/repository/VolatileContentRepository.h"
-#include "core/Repository.h"
-
-#include "C2CallbackAgent.h"
-#include "core/Connectable.h"
-#include "core/ProcessorNode.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/controller/ControllerServiceProvider.h"
-#include "core/FlowConfiguration.h"
-#include "ReflexiveSession.h"
-#include "utils/ThreadPool.h"
-#include "core/state/UpdateController.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-class ProcessorLink {
- public:
-  explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
-      : processor_(processor) {
-
-  }
-
-  const std::shared_ptr<core::Processor> &getProcessor() {
-    return processor_;
-  }
-
- protected:
-  std::shared_ptr<core::Processor> processor_;
-};
-
-class Instance {
- public:
-
-  explicit Instance(const std::string &url, const std::string &port)
-      : configure_(std::make_shared<Configure>()),
-        url_(url),
-        agent_(nullptr),
-        rpgInitialized_(false),
-        listener_thread_pool_(1),
-        content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
-        no_op_repo_(std::make_shared<minifi::core::Repository>()) {
-    running_ = false;
-    stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
-    utils::Identifier uuid;
-    uuid = port;
-    rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
-    proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
-    core::FlowConfiguration::initialize_static_functions();
-    content_repo_->initialize(configure_);
-  }
-
-  ~Instance() {
-    running_ = false;
-    listener_thread_pool_.shutdown();
-  }
-
-  bool isRPGConfigured() {
-    return rpgInitialized_;
-  }
-
-  void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
-    running_ = true;
-    if (server->type != C2_Server_Type::MQTT){
-      configure_->set("c2.rest.url",server->url);
-      configure_->set("c2.rest.url.ack",server->ack_url);
-    }
-    agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
-    listener_thread_pool_.start();
-    registerUpdateListener(agent_, 1000);
-    agent_->setStopCallback(c1);
-  }
-
-  void setRemotePort(std::string remote_port) {
-    rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
-    rpg_->initialize();
-    rpg_->setTransmitting(true);
-    rpgInitialized_ = true;
-  }
-
-  std::shared_ptr<Configure> getConfiguration() {
-    return configure_;
-  }
-
-  std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
-    return no_op_repo_;
-  }
-
-  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
-    return content_repo_;
-  }
-
-  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
-    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
-    auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
-
-    rpg_->onSchedule(processContext, sessionFactory);
-
-    auto session = std::make_shared<core::ReflexiveSession>(processContext);
-
-    session->add(ff);
-
-    rpg_->onTrigger(processContext, session);
-  }
-
- protected:
-
-  bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
-    auto functions = updateController->getFunctions();
-    // run all functions independently
-
-    for (auto function : functions) {
-      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
-      utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
-      std::future<state::Update> future;
-      if (!listener_thread_pool_.execute(std::move(functor), future)) {
-        // denote failure
-        return false;
-      }
-    }
-    return true;
-  }
-
-  std::shared_ptr<c2::C2CallbackAgent> agent_;
-
-  std::atomic<bool> running_;
-
-  bool rpgInitialized_;
-
-  std::shared_ptr<minifi::core::Repository> no_op_repo_;
-
-  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
-
-  std::shared_ptr<core::ProcessorNode> proc_node_;
-  std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
-  std::shared_ptr<io::StreamFactory> stream_factory_;
-  std::string url_;
-  std::shared_ptr<Configure> configure_;
-
-  utils::ThreadPool<state::Update> listener_thread_pool_;
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
deleted file mode 100644
index 75330a0..0000000
--- a/libminifi/include/capi/Plan.h
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_CAPI_PLAN_H_
-#define LIBMINIFI_CAPI_PLAN_H_
-#ifndef WIN32
-	#include <dirent.h>
-#endif
-#include <cstdio>
-#include <cstdlib>
-#include <sstream>
-#include "ResourceClaim.h"
-#include <vector>
-#include <set>
-#include <map>
-#include "core/logging/Logger.h"
-#include "core/Core.h"
-#include "properties/Configure.h"
-#include "properties/Properties.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/Id.h"
-#include "spdlog/sinks/ostream_sink.h"
-#include "spdlog/sinks/dist_sink.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-#include "capi/cstructs.h"
-#include "capi/api.h"
-
-using failure_callback_type = std::function<void(flow_file_record*)>;
-using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
-
-namespace {
-
-  void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
-    auto ff = session->get();
-    if (ff == nullptr) {
-      return;
-    }
-
-    auto claim = ff->getResourceClaim();
-
-    if (claim != nullptr && user_callback != nullptr) {
-      claim->increaseFlowFileRecordOwnedCount();
-      // create a flow file.
-      auto path = claim->getContentFullPath();
-      auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-      ffr->attributes = ff->getAttributesPtr();
-      ffr->ffp = ff.get();
-      auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-      *content_repo_ptr = cr_ptr;
-      user_callback(ffr);
-    }
-    session->remove(ff);
-  }
-
-  void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
-    session->rollback();
-    failureStrategyAsIs(session, user_callback, cr_ptr);
-  }
-}
-
-static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
-    { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
-
-class ExecutionPlan {
- public:
-
-  explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
-
-  std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
-
-  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
-                                                core::Relationship relationship = core::Relationship("success", "description"),
-                                                bool linkToPrevious = false);
-
-  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
-  bool linkToPrevious = false);
-
-  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
-
-  void reset();
-
-  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
-
-  bool setFailureCallback(failure_callback_type onerror_callback);
-
-  bool setFailureStrategy(FailureStrategy start);
-
-  std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
-
-  std::shared_ptr<core::FlowFile> getCurrentFlowFile();
-
-  std::shared_ptr<core::ProcessSession> getCurrentSession();
-
-  std::shared_ptr<core::Repository> getFlowRepo() {
-    return flow_repo_;
-  }
-
-  std::shared_ptr<core::Repository> getProvenanceRepo() {
-    return prov_repo_;
-  }
-
-  std::shared_ptr<core::ContentRepository> getContentRepo() {
-    return content_repo_;
-  }
-
-  std::shared_ptr<core::FlowFile> getNextFlowFile(){
-    return next_ff_;
-  }
-
-  void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
-    next_ff_ = ptr;
-  }
-
-  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
-
- protected:
-  class FailureHandler {
-   public:
-    FailureHandler(content_repo_sptr cr_ptr) {
-      callback_ = nullptr;
-      strategy_ = FailureStrategy::AS_IS;
-      content_repo_ = cr_ptr;
-    }
-    void setCallback(failure_callback_type onerror_callback) {
-      callback_=onerror_callback;
-    }
-    void setStrategy(FailureStrategy strat) {
-      strategy_ = strat;
-    }
-    void operator()(const processor_session* ps) {
-      auto ses = static_cast<core::ProcessSession*>(ps->session);
-      FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
-    }
-   private:
-    failure_callback_type callback_;
-    FailureStrategy strategy_;
-    content_repo_sptr content_repo_;
-  };
-
-  void finalize();
-
-  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
-
-  std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
-                                                        core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
-
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
-
-  content_repo_sptr content_repo_;
-
-  std::shared_ptr<core::Repository> flow_repo_;
-  std::shared_ptr<core::Repository> prov_repo_;
-
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
-
-  std::atomic<bool> finalized;
-
-  uint32_t location;
-
-  std::shared_ptr<core::ProcessSession> current_session_;
-  std::shared_ptr<core::FlowFile> current_flowfile_;
-
-  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
-  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
-  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
-  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
-  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
-  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
-  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
-  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
-  core::Relationship termination_;
-
-  std::shared_ptr<core::FlowFile> next_ff_;
-
- private:
-
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
-  std::shared_ptr<logging::Logger> logger_;
-  std::shared_ptr<FailureHandler> failure_handler_;
-};
-
-#endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/ReflexiveSession.h b/libminifi/include/capi/ReflexiveSession.h
deleted file mode 100644
index ebf6cbe..0000000
--- a/libminifi/include/capi/ReflexiveSession.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REFLEXIVE_SESSION_H__
-#define __REFLEXIVE_SESSION_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <set>
-
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-// ReflexiveSession Class
-class ReflexiveSession : public ProcessSession{
- public:
-  // Constructor
-  /*!
-   * Create a new process session
-   */
-  ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
-      : ProcessSession(processContext){
-  }
-
-// Destructor
-  virtual ~ReflexiveSession() {
-  }
-
-   virtual std::shared_ptr<core::FlowFile> get(){
-     auto prevff = ff;
-     ff = nullptr;
-     return prevff;
-   }
-
-   virtual void add(const std::shared_ptr<core::FlowFile> &flow){
-     ff = flow;
-   }
-   virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
-     // no op
-   }
- protected:
-  //
-  // Get the FlowFile from the highest priority queue
-  std::shared_ptr<core::FlowFile> ff;
-
-};
-
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
deleted file mode 100644
index e05c04e..0000000
--- a/libminifi/include/capi/api.h
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
-#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include "cstructs.h"
-#include "processors.h"
-
-int initialize_api();
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Updates with every release
- */
-#define API_VERSION "0.02"
-
-void enable_logging();
-
-void set_terminate_callback(void (*terminate_callback)());
-
-/****
- * ##################################################################
- *  BASE NIFI OPERATIONS
- * ##################################################################
- */
-
-nifi_instance *create_instance(const char *url, nifi_port *port);
-
-void initialize_instance(nifi_instance *);
-
-void free_instance(nifi_instance*);
-
-/****
- * ##################################################################
- *  C2 OPERATIONS
- * ##################################################################
- */
-
-
-typedef int c2_update_callback(char *);
-
-typedef int c2_stop_callback(char *);
-
-typedef int c2_start_callback(char *);
-
-void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
-
-
-uint8_t run_processor(const processor *processor);
-
-flow *create_new_flow(nifi_instance *);
-
-flow *create_flow(nifi_instance *, const char *);
-
-flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
-
-processor *add_processor(flow *, const char *);
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name);
-
-processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
-
-/**
-* Register your callback to received flow files that the flow failed to process
-* The flow file ownership is transferred to the caller!
-* The first callback should be registered before the flow is used. Can be changed later during runtime.
-*/
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
-
-
-/**
-* Set failure strategy. Please use the enum defined in cstructs.h
-* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
-* Can be changed runtime.
-* The defailt strategy is AS IS.
-*/
-int set_failure_strategy(flow *flow, FailureStrategy strategy);
-
-int set_property(processor *, const char *, const char *);
-
-int set_instance_property(nifi_instance *instance, const char*, const char *);
-
-int free_flow(flow *);
-
-flow_file_record *get_next_flow_file(nifi_instance *, flow *);
-
-size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
-
-flow_file_record *get(nifi_instance *,flow *, processor_session *);
-
-int transfer(processor_session* session, flow *flow, const char *rel);
-
-/**
- * Creates a flow file object.
- * Will obtain the size of file
- */
-flow_file_record* create_flowfile(const char *file, const size_t len);
-
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
-
-void free_flowfile(flow_file_record*);
-
-uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
-
-void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
-
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
-
-int get_attribute_qty(const flow_file_record* ff);
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target);
-
-uint8_t remove_attribute(flow_file_record*, char *key);
-
-/****
- * ##################################################################
- *  Remote NIFI OPERATIONS
- * ##################################################################
- */
-
-int transmit_flowfile(flow_file_record *, nifi_instance *);
-
-/****
- * ##################################################################
- *  Persistence Operations
- * ##################################################################
- */
-
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/cstructs.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h
deleted file mode 100644
index 55197d9..0000000
--- a/libminifi/include/capi/cstructs.h
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
-#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
-
-#include <stddef.h>
-#include <stdint.h>
-
-/**
- * NiFi Port struct
- */
-typedef struct {
-  char *port_id;
-} nifi_port;
-
-
-/**
- * Nifi instance struct
- */
-typedef struct {
-
-  void *instance_ptr;
-
-  nifi_port port;
-
-} nifi_instance;
-
-
-/****
- * ##################################################################
- *  C2 OPERATIONS
- * ##################################################################
- */
-
-enum C2_Server_Type{
-  REST,
-  MQTT
-};
-
-typedef struct {
-  char *url;
-  char *ack_url;
-  char *identifier;
-  char *topic;
-  enum C2_Server_Type type;
-} C2_Server;
-
-
-/****
- * ##################################################################
- *  Processor OPERATIONS
- * ##################################################################
- */
-
-typedef struct {
-  void *processor_ptr;
-} processor;
-
-
-typedef struct {
-
-  void *session;
-
-} processor_session;
-
-
-
-/****
- * ##################################################################
- *  FLOWFILE OPERATIONS
- * ##################################################################
- */
-
-typedef struct {
-  const char *key;
-  void *value;
-  size_t value_size;
-} attribute;
-
-typedef struct {
-  attribute * attributes;
-  size_t  size;
-} attribute_set;
-
-/**
- * State of a flow file
- *
- */
-typedef struct {
-  uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
-
-  void * in;
-
-  void * crp;
-
-  char * contentLocation; /**< Filesystem location of this object */
-
-  void *attributes; /**< Hash map of attributes */
-
-  void *ffp;
-
-} flow_file_record;
-
-
-typedef struct {
-  void *plan;
-} flow;
-
-typedef  enum FS { AS_IS, ROLLBACK } FailureStrategy;
-
-#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/expect.h b/libminifi/include/capi/expect.h
deleted file mode 100644
index ead182c..0000000
--- a/libminifi/include/capi/expect.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
-
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
-#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
-
-
-// various likely/unlikely pragmas I've carried over the years.
-// you'll see this in many projects
-#if defined(__GNUC__) && __GNUC__ >= 4
-#define LIKELY(x)   (__builtin_expect((x), 1))
-#define UNLIKELY(x) (__builtin_expect((x), 0))
-#else
-#define LIKELY(x)   (x)
-#define UNLIKELY(x) (x)
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/processors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/processors.h b/libminifi/include/capi/processors.h
deleted file mode 100644
index 7fe357d..0000000
--- a/libminifi/include/capi/processors.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
-#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
-
-
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct {
-  char *directory;
-  unsigned keep_source :1;
-  unsigned recurse :1;
-} GetFileConfig;
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/core/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/expect.h b/libminifi/include/core/expect.h
new file mode 100644
index 0000000..ead182c
--- /dev/null
+++ b/libminifi/include/core/expect.h
@@ -0,0 +1,32 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+
+
+// various likely/unlikely pragmas I've carried over the years.
+// you'll see this in many projects
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define LIKELY(x)   (__builtin_expect((x), 1))
+#define UNLIKELY(x) (__builtin_expect((x), 0))
+#else
+#define LIKELY(x)   (x)
+#define UNLIKELY(x) (x)
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index d3257e1..b6f625c 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -24,7 +24,7 @@
 #include <atomic>
 #include <cstdint>
 #include "io/ClientSocket.h"
-#include "capi/expect.h"
+#include "core/expect.h"
 #include "properties/Configure.h"
 
 namespace org {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/processors/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h
deleted file mode 100644
index 801c99c..0000000
--- a/libminifi/include/processors/CallbackProcessor.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * @file CallbackProcessor.h
- * CallbackProcessor class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __CALLBACK_PROCESSOR_H__
-#define __CALLBACK_PROCESSOR_H__
-
-#include <stdio.h>
-#include <string>
-#include <errno.h>
-#include <chrono>
-#include <thread>
-#include <functional>
-#include <iostream>
-#include <sys/types.h>
-#include "capi/cstructs.h"
-#include "io/BaseStream.h"
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// CallbackProcessor Class
-class CallbackProcessor : public core::Processor {
- public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
-      : Processor(name, uuid),
-        callback_(nullptr),
-        objref_(nullptr),
-        logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
-  }
-  // Destructor
-  virtual ~CallbackProcessor() {
-
-  }
-  // Processor Name
-  static constexpr char const* ProcessorName = "CallbackProcessor";
-
- public:
-
-  void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
-    objref_ = obj;
-    callback_ = ontrigger_callback;
-  }
-
-  // OnTrigger method, implemented by NiFi CallbackProcessor
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-  // Initialize, over write by NiFi CallbackProcessor
-  virtual void initialize() {
-    std::set<core::Relationship> relationships;
-    core::Relationship Success("success", "description");
-    relationships.insert(Success);
-    setSupportedRelationships(relationships);
-  }
-
- protected:
-  void *objref_;
-  std::function<void(processor_session*)> callback_;
- private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
-
-};
-
-REGISTER_RESOURCE(CallbackProcessor);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index ffb28fe..fc54a25 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -30,7 +30,7 @@
 #include <functional>
 
 #include "BackTrace.h"
-#include "capi/expect.h"
+#include "core/expect.h"
 #include "controllers/ThreadManagementService.h"
 #include "concurrentqueue.h"
 #include "core/controller/ControllerService.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp
deleted file mode 100644
index 4b7f5fb..0000000
--- a/libminifi/src/capi/C2CallbackAgent.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "capi/C2CallbackAgent.h"
-#include <csignal>
-#include <utility>
-#include <vector>
-#include <map>
-#include <string>
-#include <memory>
-#include "c2/ControllerSocketProtocol.h"
-#include "core/state/UpdateController.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/file/FileUtils.h"
-#include "utils/file/FileManager.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
-                                 const std::shared_ptr<Configure> &configuration)
-    : C2Agent(controller, updateSink, configuration),
-      stop(nullptr),
-      logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
-}
-
-void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) {
-  switch (resp.op) {
-    case Operation::CLEAR:
-      break;
-    case Operation::UPDATE:
-      break;
-    case Operation::DESCRIBE:
-      break;
-    case Operation::RESTART:
-      break;
-    case Operation::START:
-    case Operation::STOP: {
-      if (resp.name == "C2" || resp.name == "c2") {
-        raise(SIGTERM);
-      }
-
-      auto str = resp.name.c_str();
-
-      if (nullptr != stop)
-        stop(const_cast<char*>(str));
-
-      break;
-    }
-      //
-      break;
-    default:
-      break;
-      // do nothing
-  }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
deleted file mode 100644
index 78b864c..0000000
--- a/libminifi/src/capi/Plan.cpp
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "capi/Plan.h"
-#include "processors/CallbackProcessor.h"
-#include <memory>
-#include <vector>
-#include <set>
-#include <string>
-
-bool intToFailureStragey(int in, FailureStrategy *out) {
-  auto tmp = static_cast<FailureStrategy>(in);
-  switch (tmp) {
-    case AS_IS:
-    case ROLLBACK:
-      *out = tmp;
-      return true;
-    default:
-      return false;
-  }
-}
-
-std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
-    : content_repo_(content_repo),
-      flow_repo_(flow_repo),
-      prov_repo_(prov_repo),
-      finalized(false),
-      location(-1),
-      current_flowfile_(nullptr),
-      logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
-  stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
-}
-
-/**
- * Add a callback to obtain and pass processor session to a generated processor
- *
- */
-std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
-  if (finalized) {
-    return nullptr;
-  }
-
-  auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
-  if (!ptr)
-    return nullptr;
-
-  std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
-  processor->setCallback(obj, fp);
-
-  return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
-}
-
-bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
-  uint32_t i = 0;
-  logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
-  for (i = 0; i < processor_queue_.size(); i++) {
-    if (processor_queue_.at(i) == proc) {
-      break;
-    }
-  }
-
-  if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
-    return false;
-  }
-
-  return processor_contexts_.at(i)->setProperty(prop, value);
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
-  if (finalized) {
-    return nullptr;
-  }
-
-  utils::Identifier uuid;
-  id_generator_->generate(uuid);
-
-  processor->setStreamFactory(stream_factory);
-  // initialize the processor
-  processor->initialize();
-
-  processor_mapping_[processor->getUUIDStr()] = processor;
-
-  if (!linkToPrevious) {
-    termination_ = relationship;
-  } else {
-    std::shared_ptr<core::Processor> last = processor_queue_.back();
-
-    if (last == nullptr) {
-      last = processor;
-      termination_ = relationship;
-    }
-
-    relationships_.push_back(connectProcessors(last, processor, relationship, true));
-  }
-
-  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
-  processor_nodes_.push_back(node);
-
-  std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
-  processor_contexts_.push_back(context);
-
-  processor_queue_.push_back(processor);
-
-  return processor;
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
-  if (finalized) {
-    return nullptr;
-  }
-  auto processor = ExecutionPlan::createProcessor(processor_name, name);
-  if (!processor) {
-    return nullptr;
-  }
-  return addProcessor(processor, name, relationship, linkToPrevious);
-}
-
-void ExecutionPlan::reset() {
-  process_sessions_.clear();
-  factories_.clear();
-  location = -1;
-  for (auto proc : processor_queue_) {
-    while (proc->getActiveTasks() > 0) {
-      proc->decrementActiveTask();
-    }
-  }
-}
-
-bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
-  if (!finalized) {
-    finalize();
-  }
-  location++;
-  if (location >= processor_queue_.size()) {
-    return false;
-  }
-  std::shared_ptr<core::Processor> processor = processor_queue_[location];
-  std::shared_ptr<core::ProcessContext> context = processor_contexts_[location];
-  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
-  factories_.push_back(factory);
-  if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
-    processor->onSchedule(context, factory);
-    configured_processors_.push_back(processor);
-  }
-  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
-  process_sessions_.push_back(current_session);
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  if (verify != nullptr) {
-    verify(context, current_session);
-  } else {
-    logger_->log_debug("Running %s", processor->getName());
-    processor->onTrigger(context, current_session);
-  }
-  current_session->commit();
-  current_flowfile_ = current_session->get();
-  auto hasMore = location + 1 < processor_queue_.size();
-  if (!hasMore && !current_flowfile_) {
-    std::set<std::shared_ptr<core::FlowFile>> expired;
-    current_flowfile_ = relationships_.back()->poll(expired);
-  }
-  return hasMore;
-}
-
-std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
-  return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
-}
-
-std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
-  return current_flowfile_;
-}
-
-std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
-  return current_session_;
-}
-
-std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
-  return connectProcessors(processor, processor, termination_, set_dst);
-}
-
-void ExecutionPlan::finalize() {
-  if (failure_handler_) {
-    auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
-
-    std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
-    callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
-
-    for (const auto& proc : processor_queue_) {
-      for (const auto& rel : proc->getSupportedRelationships()) {
-        if (rel.getName() == "failure") {
-          relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
-          break;
-        }
-      }
-    }
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
-
-    processor_nodes_.push_back(node);
-
-    std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
-    processor_contexts_.push_back(context);
-
-    processor_queue_.push_back(failure_proc);
-  }
-
-  if (relationships_.size() > 0) {
-    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
-  } else {
-    for (auto processor : processor_queue_) {
-      relationships_.push_back(buildFinalConnection(processor, true));
-    }
-  }
-
-  finalized = true;
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
-  utils::Identifier uuid;
-  id_generator_->generate(uuid);
-
-  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
-  if (nullptr == ptr) {
-    return nullptr;
-  }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
-  processor->setName(name);
-  return processor;
-}
-
-std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
-                                                      core::Relationship relationship, bool set_dst) {
-  std::stringstream connection_name;
-  connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
-  connection->setRelationship(relationship);
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(src_proc);
-
-  utils::Identifier uuid_copy, uuid_copy_next;
-  src_proc->getUUID(uuid_copy);
-  connection->setSourceUUID(uuid_copy);
-  if (set_dst) {
-    connection->setDestination(dst_proc);
-    dst_proc->getUUID(uuid_copy_next);
-    connection->setDestinationUUID(uuid_copy_next);
-    if (src_proc != dst_proc) {
-      dst_proc->addConnection(connection);
-    }
-  }
-  src_proc->addConnection(connection);
-
-  return connection;
-}
-
-bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
-  if (finalized && !failure_handler_) {
-    return false;  // Already finalized the flow without failure handler processor
-  }
-  if (!failure_handler_) {
-    failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
-  }
-  failure_handler_->setCallback(onerror_callback);
-  return true;
-}
-
-bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
-  if (!failure_handler_) {
-    return false;
-  }
-  failure_handler_->setStrategy(start);
-  return true;
-}
-