You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/31 18:28:38 UTC

[2/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into nanofi

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
deleted file mode 100644
index e135fe1..0000000
--- a/libminifi/src/capi/api.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <string>
-#include <map>
-#include <memory>
-#include <utility>
-#include <exception>
-#include "core/Core.h"
-#include "capi/api.h"
-#include "capi/expect.h"
-#include "capi/Instance.h"
-#include "capi/Plan.h"
-#include "ResourceClaim.h"
-#include "processors/GetFile.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/StringUtils.h"
-
-using string_map = std::map<std::string, std::string>;
-
-class API_INITIALIZER {
- public:
-  static int initialized;
-};
-
-int API_INITIALIZER::initialized = initialize_api();
-
-int initialize_api() {
-  logging::LoggerConfiguration::getConfiguration().disableLogging();
-  return 1;
-}
-
-void enable_logging() {
-  logging::LoggerConfiguration::getConfiguration().enableLogging();
-}
-
-void set_terminate_callback(void (*terminate_callback)()) {
-  std::set_terminate(terminate_callback);
-}
-
-class DirectoryConfiguration {
- protected:
-  DirectoryConfiguration() {
-    minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
-  }
- public:
-  static void initialize() {
-    static DirectoryConfiguration configure;
-  }
-};
-
-/**
- * Creates a NiFi Instance from the url and output port.
- * @param url http URL for NiFi instance
- * @param port Remote output port.
- * @Deprecated for API version 0.2 in favor of the following prototype
- * nifi_instance *create_instance(nifi_port const *port) {
- */
-nifi_instance *create_instance(const char *url, nifi_port *port) {
-  // make sure that we have a thread safe way of initializing the content directory
-  DirectoryConfiguration::initialize();
-
-  // need reinterpret cast until we move to C for this module.
-  nifi_instance *instance = reinterpret_cast<nifi_instance*>( malloc(sizeof(nifi_instance)) );
-  /**
-   * This API will gradually move away from C++, hence malloc is used for nifi_instance
-   * Since minifi::Instance is currently being used, then we need to use new in that case.
-   */
-  instance->instance_ptr = new minifi::Instance(url, port->port_id);
-  // may have to translate port ID here in the future
-  // need reinterpret cast until we move to C for this module.
-  instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
-  snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
-  return instance;
-}
-
-/**
- * Initializes the instance
- */
-void initialize_instance(nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->setRemotePort(instance->port.port_id);
-}
-/*
- typedef int c2_update_callback(char *);
-
- typedef int c2_stop_callback(char *);
-
- typedef int c2_start_callback(char *);
-
- */
-void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
-}
-
-/**
- * Sets a property within the nifi instance
- * @param instance nifi instance
- * @param key key in which we will set the valiue
- * @param value
- * @return -1 when instance or key are null
- */
-int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
-  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
-    return -1;
-  }
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  minifi_instance_ref->getConfiguration()->set(key, value);
-  return 0;
-}
-
-/**
- * Reclaims memory associated with a nifi instance structure.
- * @param instance nifi instance.
- */
-void free_instance(nifi_instance* instance) {
-  if (instance != nullptr) {
-    delete ((minifi::Instance*) instance->instance_ptr);
-    free(instance->port.port_id);
-    free(instance);
-  }
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_flowfile(const char *file, const size_t len) {
-  flow_file_record *new_ff = new flow_file_record;
-  new_ff->attributes = new string_map();
-  new_ff->contentLocation = new char[len + 1];
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
-  // set the size of the flow file.
-  new_ff->size = in.tellg();
-  return new_ff;
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
-  if (nullptr == file) {
-    return nullptr;
-  }
-  flow_file_record *new_ff = create_ff_object_na(file, len, size);
-  new_ff->attributes = new string_map();
-  new_ff->ffp = 0;
-  return new_ff;
-}
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
-  flow_file_record *new_ff = new flow_file_record;
-  new_ff->attributes = nullptr;
-  new_ff->contentLocation = new char[len + 1];
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  // set the size of the flow file.
-  new_ff->size = size;
-  new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
-  return new_ff;
-}
-/**
- * Reclaims memory associated with a flow file object
- * @param ff flow file record.
- */
-void free_flowfile(flow_file_record *ff) {
-  if (ff == nullptr) {
-    return;
-  }
-  auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
-  if (content_repo_ptr->get()) {
-    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
-    (*content_repo_ptr)->remove(claim);
-  }
-  if (ff->ffp == nullptr) {
-    auto map = static_cast<string_map*>(ff->attributes);
-    delete map;
-  }
-  delete[] ff->contentLocation;
-  delete ff;
-  delete content_repo_ptr;
-}
-
-/**
- * Adds an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
- */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
-  return ret.second ? 0 : -1;
-}
-
-/**
- * Updates (or adds) an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- */
-void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
-}
-
-/*
- * Obtains the attribute.
- * @param ff flow file record
- * @param key key
- * @param caller_attribute caller supplied object in which we will copy the data ptr
- * @return 0 if successful, -1 if the key does not exist
- */
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) {
-  if (ff == nullptr) {
-    return -1;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map) {
-    return -1;
-  }
-  auto find = attribute_map->find(caller_attribute->key);
-  if (find != attribute_map->end()) {
-    caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
-    caller_attribute->value_size = find->second.size();
-    return 0;
-  }
-  return -1;
-}
-
-int get_attribute_qty(const flow_file_record* ff) {
-  if (ff == nullptr) {
-    return 0;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  return attribute_map ? attribute_map->size() : 0;
-}
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
-  if (ff == nullptr) {
-    return 0;
-  }
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map || attribute_map->empty()) {
-    return 0;
-  }
-  int i = 0;
-  for (const auto& kv : *attribute_map) {
-    if (i >= target->size) {
-      break;
-    }
-    target->attributes[i].key = kv.first.data();
-    target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
-    target->attributes[i].value_size = kv.second.size();
-    ++i;
-  }
-  return i;
-}
-
-/**
- * Removes a key from the attribute chain
- * @param ff flow file record
- * @param key key to remove
- * @return 0 if removed, -1 otherwise
- */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-  return attribute_map->erase(key) - 1;  // erase by key returns the number of elements removed (0 or 1)
-}
-
-/**
- * Transmits the flowfile
- * @param ff flow file record
- * @param instance nifi instance structure
- */
-int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
-  if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
-    minifi_instance_ref->setRemotePort(instance->port.port_id);
-  }
-
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
-
-  auto no_op = minifi_instance_ref->getNoOpRepository();
-
-  auto content_repo = minifi_instance_ref->getContentRepository();
-
-  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
-  claim->increaseFlowFileRecordOwnedCount();
-  claim->increaseFlowFileRecordOwnedCount();
-
-  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
-  ffr->addAttribute("nanofi.version", API_VERSION);
-  ffr->setSize(ff->size);
-
-  std::string port_uuid = instance->port.port_id;
-
-  minifi_instance_ref->transfer(ffr);
-
-  return 0;
-}
-
-flow *create_new_flow(nifi_instance *instance) {
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = new flow;
-
-  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
-
-  return new_flow;
-}
-
-flow *create_flow(nifi_instance *instance, const char *first_processor) {
-  if (nullptr == instance || nullptr == instance->instance_ptr) {
-    return nullptr;
-  }
-  auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = new flow;
-
-  auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
-
-  if (first_processor != nullptr && strlen(first_processor) > 0) {
-    // automatically adds it with success
-    execution_plan->addProcessor(first_processor, first_processor);
-  }
-  return new_flow;
-}
-
-processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
-  if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
-    return nullptr;
-  }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
-  processor *new_processor = new processor();
-  new_processor->processor_ptr = proc.get();
-  return new_processor;
-}
-
-flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) {
-  static const std::string first_processor = "GetFile";
-  flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
-
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
-  // automatically adds it with success
-  auto getFile = plan->addProcessor(first_processor, first_processor);
-
-  plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
-  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
-  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
-
-  return new_flow;
-}
-
-processor *add_processor(flow *flow, const char *processor_name) {
-  if (nullptr == flow || nullptr == processor_name) {
-    return nullptr;
-  }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name);
-  if (proc) {
-    processor *new_processor = new processor();
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
-}
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
-  if (proc) {
-    processor *new_processor = new processor();
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
-}
-
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
-}
-
-int set_failure_strategy(flow *flow, FailureStrategy strategy) {
-  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
-}
-
-int set_property(processor *proc, const char *name, const char *value) {
-  if (name != nullptr && value != nullptr && proc != nullptr) {
-    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
-    bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
-    return success ? 0 : -2;
-  }
-  return -1;
-}
-
-int free_flow(flow *flow) {
-  if (flow == nullptr || nullptr == flow->plan)
-    return -1;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  delete execution_plan;
-  delete flow;
-  return 0;
-}
-
-flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
-  if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
-    return nullptr;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  execution_plan->reset();
-  while (execution_plan->runNextProcessor()) {
-  }
-  auto ff = execution_plan->getCurrentFlowFile();
-  if (ff == nullptr) {
-    return nullptr;
-  }
-  auto claim = ff->getResourceClaim();
-
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->ffp = ff.get();
-    ffr->attributes = ff->getAttributesPtr();
-    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
-    return nullptr;
-  }
-}
-
-size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
-  if (nullptr == instance || nullptr == flow || nullptr == ff_r)
-    return 0;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  size_t i = 0;
-  for (; i < size; i++) {
-    execution_plan->reset();
-    auto ffr = get_next_flow_file(instance, flow);
-    if (ffr == nullptr) {
-      break;
-    }
-    ff_r[i] = ffr;
-  }
-  return i;
-}
-
-flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) {
-  if (nullptr == instance || nullptr == flow || nullptr == session)
-    return nullptr;
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto ff = sesh->get();
-  execution_plan->setNextFlowFile(ff);
-  if (ff == nullptr) {
-    return nullptr;
-  }
-  auto claim = ff->getResourceClaim();
-
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->attributes = ff->getAttributesPtr();
-    ffr->ffp = ff.get();
-    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
-    return nullptr;
-  }
-}
-
-int transfer(processor_session* session, flow *flow, const char *rel) {
-  if (nullptr == session || nullptr == flow || rel == nullptr) {
-    return -1;
-  }
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  if (nullptr == sesh || nullptr == execution_plan) {
-    return -1;
-  }
-  core::Relationship relationship(rel, rel);
-  auto ff = execution_plan->getNextFlowFile();
-  if (nullptr == ff) {
-    return -2;
-  }
-  sesh->transfer(ff, relationship);
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 47c7ba6..674566b 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -17,7 +17,7 @@
  */
 
 #include "core/repository/VolatileContentRepository.h"
-#include "capi/expect.h"
+#include "core/expect.h"
 #include <cstdio>
 #include <string>
 #include <memory>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/processors/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp
deleted file mode 100644
index 5524d92..0000000
--- a/libminifi/src/processors/CallbackProcessor.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/CallbackProcessor.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  if (callback_ != nullptr) {
-    processor_session sesh;
-    sesh.session = session;
-    callback_(&sesh);
-  }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
deleted file mode 100644
index b7bf784..0000000
--- a/libminifi/test/capi/CAPITests.cpp
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <sys/stat.h>
-#include <utility>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <fstream>
-
-#include "utils/file/FileUtils.h"
-#include "../TestBase.h"
-
-#include "capi/api.h"
-
-#include <chrono>
-#include <thread>
-
-static nifi_instance *create_instance_obj(const char *name = "random_instance") {
-  nifi_port port;
-  char port_str[] = "12345";
-  port.port_id = port_str;
-  return create_instance("random_instance", &port);
-}
-
-static int failure_count = 0;
-
-void failure_counter(flow_file_record * fr) {
-  failure_count++;
-  REQUIRE(get_attribute_qty(fr) > 0);
-  free_flowfile(fr);
-}
-
-void big_failure_counter(flow_file_record * fr) {
-  failure_count += 100;
-  free_flowfile(fr);
-}
-
-TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
-  REQUIRE(test_proc != nullptr);
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  processor *test_proc = add_processor(test_flow, "NeverExisted");
-  REQUIRE(test_proc == nullptr);
-  processor *no_proc = add_processor(test_flow, "");
-  REQUIRE(no_proc == nullptr);
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
-  REQUIRE(test_proc != nullptr);
-  REQUIRE(set_property(test_proc, "Data Format", "Text") == 0);  // Valid value
-  // TODO(aboda): add this two below when property handling is made strictly typed
-  // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
-  // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
-  REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0);  // Empty value
-  REQUIRE(set_property(test_proc, nullptr, "Blah") != 0);  // Empty attribute
-  REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0);  // Invalid processor
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("get file and put file", "[getAndPutFile]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  char put_format[] = "/tmp/pt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-  const char *putfiledir = testController.createTempDirectory(put_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
-  REQUIRE(put_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-  flow_file_record *record = get_next_flow_file(instance, test_flow);
-  REQUIRE(record != nullptr);
-
-  ss.str("");
-
-  ss << putfiledir << "/" << "tstFile.ext";
-  std::ifstream t(ss.str());
-  std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
-
-  REQUIRE(test_file_content == put_data);
-
-  // No failure handler can be added after the flow is finalized
-  REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
-
-  free_flowfile(record);
-
-  free_flow(test_flow);
-
-  free_instance(instance);
-}
-
-TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
-  REQUIRE(extract_test != nullptr);
-  REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
-  processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
-  REQUIRE(update_attr != nullptr);
-
-  REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
-
-  flow_file_record *record = get_next_flow_file(instance, test_flow);
-
-  REQUIRE(record != nullptr);
-
-  attribute test_attr;
-  test_attr.key = "TestAttr";
-  REQUIRE(get_attribute(record, &test_attr) == 0);
-
-  REQUIRE(test_attr.value_size != 0);
-  REQUIRE(test_attr.value != nullptr);
-
-  std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
-
-  REQUIRE(attr_value == test_file_content);
-
-  const char * new_testattr_value = "S0me t3st t3xt";
-
-  // Attribute already exist, should fail
-  REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0);  // NOLINT
-
-  // Update overwrites values
-  update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value));  // NOLINT
-
-  int attr_size = get_attribute_qty(record);
-  REQUIRE(attr_size > 0);
-
-  attribute_set attr_set;
-  attr_set.size = attr_size;
-  attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute));  // NOLINT
-
-  REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
-
-  bool test_attr_found = false;
-  bool updated_attr_found = false;
-  for (int i = 0; i < attr_set.size; ++i) {
-    if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
-      test_attr_found = true;
-      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
-    } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
-      updated_attr_found = true;
-      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
-    }
-  }
-  REQUIRE(updated_attr_found == true);
-  REQUIRE(test_attr_found == true);
-
-  free_flowfile(record);
-
-  free_flow(test_flow);
-  free_instance(instance);
-}
-
-TEST_CASE("Test error handling callback", "[errorHandling]") {
-  TestController testController;
-
-  char src_format[] = "/tmp/gt.XXXXXX";
-  const char *sourcedir = testController.createTempDirectory(src_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
-  auto instance = create_instance_obj();
-  REQUIRE(instance != nullptr);
-  flow *test_flow = create_flow(instance, nullptr);
-  REQUIRE(test_flow != nullptr);
-
-  // Failure strategy cannot be set before a valid callback is added
-  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
-  REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
-
-  processor *get_proc = add_processor(test_flow, "GetFile");
-  REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
-  REQUIRE(put_proc != nullptr);
-  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
-  REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-
-  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-
-  REQUIRE(failure_count == 1);
-
-  // Failure handler function can be replaced runtime
-  REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
-  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
-
-  // Create new testfile to trigger failure again
-  ss << "2";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
-
-  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-  REQUIRE(failure_count > 100);
-
-  failure_count = 0;
-
-  free_flow(test_flow);
-  free_instance(instance);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
new file mode 100644
index 0000000..bd2d952
--- /dev/null
+++ b/nanofi/CMakeLists.txt
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(include)
+include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+
+if(WIN32)
+include_directories(../libminifi/opsys/win)
+else()
+include_directories(../libminifi/opsys/posix)
+endif()
+
+file(GLOB NANOFI_SOURCES  "src/api/*.cpp" "src/cxx/*.cpp" )
+
+file(GLOB NANOFI_EXAMPLES_SOURCES  "examples/*.c" )
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+	    if (_cpp_latest_flag_supported)
+	        add_compile_options("/std:c++14")
+	    endif()
+	endif()
+else()
+
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+add_library(nanofi STATIC ${NANOFI_SOURCES})
+
+if (APPLE)
+	target_link_libraries (nanofi -Wl,-all_load core-minifi minifi)
+elseif(NOT WIN32)
+	target_link_libraries (nanofi -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive)
+else()
+    set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi")
+	set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi")
+endif ()
+
+if(WIN32)
+	set_target_properties(nanofi PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+if (ENABLE_PYTHON)
+
+add_library(nanofi-shared SHARED ${NANOFI_SOURCES})
+
+if (APPLE)
+	target_link_libraries (nanofi-shared -Wl,-all_load core-minifi-shared minifi-shared)
+elseif(NOT WIN32)
+	target_link_libraries (nanofi-shared -Wl,--whole-archive core-minifi-shared minifi-shared -Wl,--no-whole-archive)
+else()
+    set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi-shared")
+	set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi-shared")
+endif ()
+
+
+if(WIN32)
+	set_target_properties(nanofi-shared PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+set_property(TARGET nanofi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+endif(ENABLE_PYTHON)
+
+if (NOT DISABLE_CURL)
+add_subdirectory(examples)
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
new file mode 100644
index 0000000..f5f660f
--- /dev/null
+++ b/nanofi/examples/CMakeLists.txt
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+	    if (_cpp_latest_flag_supported)
+	        add_compile_options("/std:c++14")
+	    endif()
+	endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (WIN32)
+    set(LINK_FLAGS "/WHOLEARCHIVE")
+    set(LINK_END_FLAGS "")
+elseif (APPLE)
+    set(LINK_FLAGS "-Wl,-all_load")
+    set(LINK_END_FLAGS "")
+else ()
+    set(LINK_FLAGS "-Wl,--whole-archive")
+    set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
+add_executable(generate_flow generate_flow.c)
+
+add_executable(terminate_handler terminate_handler.c)
+
+target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
+if (NOT WIN32)
+
+add_executable(transmit_flow transmit_flow.c)
+
+target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(monitor_directory monitor_directory.c)
+
+target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/generate_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/generate_flow.c b/nanofi/examples/generate_flow.c
new file mode 100644
index 0000000..707de11
--- /dev/null
+++ b/nanofi/examples/generate_flow.c
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 3) {
+    printf("Error: must run ./generate_flow <instance> <remote port> \n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+  flow_file_record *record = get_next_flow_file(instance, new_flow);
+
+  if (record == 0) {
+    printf("Could not create flow file");
+    exit(1);
+  }
+
+  transmit_flowfile(record, instance);
+
+  free_flowfile(record);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  //transfer_file_or_directory(instance,file);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/monitor_directory.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/monitor_directory.c b/nanofi/examples/monitor_directory.c
new file mode 100644
index 0000000..2283b35
--- /dev/null
+++ b/nanofi/examples/monitor_directory.c
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <pthread.h>
+
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+pthread_mutex_t mutex;
+pthread_cond_t condition;
+
+int stopped;
+
+int stop_callback(char *c) {
+  pthread_mutex_lock(&mutex);
+  stopped = 1;
+  pthread_cond_signal(&condition);
+  pthread_mutex_unlock(&mutex);
+  return 0;
+}
+
+int is_stopped(void *ptr) {
+  int is_stop = 0;
+  pthread_mutex_lock(&mutex);
+  is_stop = stopped;
+  pthread_mutex_unlock(&mutex);
+  return is_stop;
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+  if (argc < 5) {
+    printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
+    exit(1);
+  }
+
+  stopped = 0x00;
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *directory = argv[3];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  C2_Server server;
+  server.url = argv[4];
+  server.ack_url = argv[5];
+  server.identifier = "monitor_directory";
+  server.type = REST;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
+
+  flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
+
+  transmit_to_nifi(instance, new_flow, is_stopped);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/terminate_handler.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c
new file mode 100644
index 0000000..d5443f0
--- /dev/null
+++ b/nanofi/examples/terminate_handler.c
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that registers terminate handler and generates an exception.
+ */
+
+void example_terminate_handler() {
+  fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
+  exit(0);
+}
+
+int main(int argc, char **argv) {
+
+  nifi_port port;
+
+  port.port_id = "12345";
+
+  set_terminate_callback(example_terminate_handler);
+
+  nifi_instance *instance = create_instance("random instance", &port);
+
+  flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+  processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
+
+  // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
+  // As it doesn't exist, an exception is thrown
+  set_property(put_proc, "Directory", "/tmp/never_existed");
+  set_property(put_proc, "Create Missing Directories", "false");
+
+  flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+  // Here be dragons - nothing below this line gets executed
+  fprintf(stderr, "Dragons!!!");
+  free_flowfile(record);
+  free_flow(new_flow);
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/transmit_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
new file mode 100644
index 0000000..e132c8b
--- /dev/null
+++ b/nanofi/examples/transmit_flow.c
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "api/nanofi.h"
+
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+  int size = 1;
+
+  if (is_dir(file_or_dir)) {
+    DIR *d;
+
+    struct dirent *dir;
+    d = opendir(file_or_dir);
+    if (d) {
+      while ((dir = readdir(d)) != NULL) {
+        if (!memcmp(dir->d_name,".",1) )
+          continue;
+        char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+        sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+        transfer_file_or_directory(instance,file_path);
+        free(file_path);
+      }
+      closedir(d);
+    }
+    printf("%s is a directory", file_or_dir);
+  } else {
+    printf("Transferring %s\n",file_or_dir);
+
+    flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
+
+    add_attribute(record, "addedattribute", "1", 2);
+
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+  if (argc < 4) {
+    printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+    exit(1);
+  }
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *file = argv[3];
+
+  nifi_port port;
+
+  port.port_id = portStr;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // initializing will make the transmission slightly more efficient.
+  //initialize_instance(instance);
+  transfer_file_or_directory(instance,file);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/api/nanofi.h
----------------------------------------------------------------------
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
new file mode 100644
index 0000000..31a4829
--- /dev/null
+++ b/nanofi/include/api/nanofi.h
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include "core/cstructs.h"
+#include "core/processors.h"
+
+int initialize_api();
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Updates with every release. Functions used here constitute the public API of NanoFi.
+ *
+ * Changes here will follow semver
+ */
+#define API_VERSION "0.02"
+
+void enable_logging();
+
+void set_terminate_callback(void (*terminate_callback)());
+
+/****
+ * ##################################################################
+ *  BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+nifi_instance *create_instance(const char *url, nifi_port *port);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+/****
+ * ##################################################################
+ *  C2 OPERATIONS
+ * ##################################################################
+ */
+
+
+typedef int c2_update_callback(char *);
+
+typedef int c2_stop_callback(char *);
+
+typedef int c2_start_callback(char *);
+
+void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
+
+
+uint8_t run_processor(const processor *processor);
+
+flow *create_new_flow(nifi_instance *);
+
+flow *create_flow(nifi_instance *, const char *);
+
+flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+
+processor *add_processor(flow *, const char *);
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name);
+
+processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file ownership is transferred to the caller!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
+
+int set_property(processor *, const char *, const char *);
+
+int set_instance_property(nifi_instance *instance, const char*, const char *);
+
+int free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
+
+flow_file_record *get(nifi_instance *,flow *, processor_session *);
+
+int transfer(processor_session* session, flow *flow, const char *rel);
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len);
+
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
+
+int get_attribute_qty(const flow_file_record* ff);
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ *  Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+int transmit_flowfile(flow_file_record *, nifi_instance *);
+
+/****
+ * ##################################################################
+ *  Persistence Operations
+ * ##################################################################
+ */
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/comms.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/comms.h b/nanofi/include/blocks/comms.h
new file mode 100644
index 0000000..bfacc3d
--- /dev/null
+++ b/nanofi/include/blocks/comms.h
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_COMMS_H_
+#define BLOCKS_COMMS_H_
+
+#include <stdio.h>
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define SUCCESS 0x00
+#define FINISHED_EARLY 0x01
+#define FAIL 0x02
+
+typedef int transmission_stop(void *);
+
+uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
+
+  flow_file_record *record = 0x00;
+  do {
+    record = get_next_flow_file(instance, flow);
+
+    if (record == 0) {
+      return FINISHED_EARLY;
+    }
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  } while (record != 0x00 && !(stop_callback != 0x00 && stop_callback(0x00)));
+  return SUCCESS;
+}
+
+#endif /* BLOCKS_COMMS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/file_blocks.h b/nanofi/include/blocks/file_blocks.h
new file mode 100644
index 0000000..d84ccbe
--- /dev/null
+++ b/nanofi/include/blocks/file_blocks.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_FILE_BLOCKS_H_
+#define BLOCKS_FILE_BLOCKS_H_
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define KEEP_SOURCE 0x01
+#define RECURSE 0x02
+
+/**
+ * Monitor directory can be combined into a current flow. to create an execution plan
+ */
+flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
+  GetFileConfig config;
+  config.directory = directory;
+  config.keep_source = flags & KEEP_SOURCE;
+  config.recurse = flags & RECURSE;
+  return create_getfile(instance, parent_flow, &config);
+}
+
+#endif /* BLOCKS_FILE_BLOCKS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/cstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
new file mode 100644
index 0000000..bc9700e
--- /dev/null
+++ b/nanofi/include/core/cstructs.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+  char *port_id;
+} nifi_port;
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+  void *instance_ptr;
+
+  nifi_port port;
+
+} nifi_instance;
+
+/****
+ * ##################################################################
+ *  C2 OPERATIONS
+ * ##################################################################
+ */
+
+enum C2_Server_Type {
+  REST,
+  MQTT
+};
+
+typedef struct {
+  char *url;
+  char *ack_url;
+  char *identifier;
+  char *topic;
+  enum C2_Server_Type type;
+} C2_Server;
+
+/****
+ * ##################################################################
+ *  Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+  void *processor_ptr;
+} processor;
+
+typedef struct {
+  void *session;
+} processor_session;
+
+/****
+ * ##################################################################
+ *  FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+  const char *key;
+  void *value;
+  size_t value_size;
+} attribute;
+
+typedef struct {
+  attribute * attributes;
+  size_t size;
+} attribute_set;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+  uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+  void * in;
+
+  void * crp;
+
+  char * contentLocation; /**< Filesystem location of this object */
+
+  void *attributes; /**< Hash map of attributes */
+
+  void *ffp;
+
+} flow_file_record;
+
+typedef struct {
+  void *plan;
+} flow;
+
+typedef enum FS {
+  AS_IS,
+  ROLLBACK
+} FailureStrategy;
+
+#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/processors.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/processors.h b/nanofi/include/core/processors.h
new file mode 100644
index 0000000..7fe357d
--- /dev/null
+++ b/nanofi/include/core/processors.h
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+  char *directory;
+  unsigned keep_source :1;
+  unsigned recurse :1;
+} GetFileConfig;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h
new file mode 100644
index 0000000..f620baa
--- /dev/null
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "c2/C2Agent.h"
+#include "core/state/Value.h"
+#include "c2/C2Payload.h"
+#include "c2/C2Protocol.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+typedef int c2_ag_update_callback(char *);
+
+typedef int c2_ag_stop_callback(char *);
+
+typedef int c2_ag_start_callback(char *);
+
+class C2CallbackAgent : public c2::C2Agent {
+
+ public:
+
+  explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+  virtual ~C2CallbackAgent() {
+  }
+
+  void setStopCallback(c2_ag_stop_callback *st){
+    stop = st;
+  }
+
+
+ protected:
+  /**
+     * Handles a C2 event requested by the server.
+     * @param resp c2 server response.
+     */
+    virtual void handle_c2_server_response(const C2ContentResponse &resp);
+
+    c2_ag_stop_callback *stop;
+
+ private:
+    std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h
new file mode 100644
index 0000000..61e824f
--- /dev/null
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -0,0 +1,100 @@
+/**
+ * @file CallbackProcessor.h
+ * CallbackProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CALLBACK_PROCESSOR_H__
+#define __CALLBACK_PROCESSOR_H__
+
+#include <stdio.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <iostream>
+#include <sys/types.h>
+#include "core/cstructs.h"
+#include "io/BaseStream.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// CallbackProcessor Class
+class CallbackProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
+      : Processor(name, uuid),
+        callback_(nullptr),
+        objref_(nullptr),
+        logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
+  }
+  // Destructor
+  virtual ~CallbackProcessor() {
+
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "CallbackProcessor";
+
+ public:
+
+  void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
+    objref_ = obj;
+    callback_ = ontrigger_callback;
+  }
+
+  // OnTrigger method, implemented by NiFi CallbackProcessor
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  // Initialize, over write by NiFi CallbackProcessor
+  virtual void initialize() {
+    std::set<core::Relationship> relationships;
+    core::Relationship Success("success", "description");
+    relationships.insert(Success);
+    setSupportedRelationships(relationships);
+  }
+
+ protected:
+  void *objref_;
+  std::function<void(processor_session*)> callback_;
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+REGISTER_RESOURCE(CallbackProcessor);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Instance.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
new file mode 100644
index 0000000..982f6d4
--- /dev/null
+++ b/nanofi/include/cxx/Instance.h
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "C2CallbackAgent.h"
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+#include "utils/ThreadPool.h"
+#include "core/state/UpdateController.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+  explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+      : processor_(processor) {
+
+  }
+
+  const std::shared_ptr<core::Processor> &getProcessor() {
+    return processor_;
+  }
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+  explicit Instance(const std::string &url, const std::string &port)
+      : configure_(std::make_shared<Configure>()),
+        url_(url),
+        agent_(nullptr),
+        rpgInitialized_(false),
+        listener_thread_pool_(1),
+        content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+        no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+    running_ = false;
+    stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
+    utils::Identifier uuid;
+    uuid = port;
+    rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+    proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+    core::FlowConfiguration::initialize_static_functions();
+    content_repo_->initialize(configure_);
+  }
+
+  ~Instance() {
+    running_ = false;
+    listener_thread_pool_.shutdown();
+  }
+
+  bool isRPGConfigured() {
+    return rpgInitialized_;
+  }
+
+  void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+    running_ = true;
+    if (server->type != C2_Server_Type::MQTT){
+      configure_->set("c2.rest.url",server->url);
+      configure_->set("c2.rest.url.ack",server->ack_url);
+    }
+    agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
+    listener_thread_pool_.start();
+    registerUpdateListener(agent_, 1000);
+    agent_->setStopCallback(c1);
+  }
+
+  void setRemotePort(std::string remote_port) {
+    rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+    rpg_->initialize();
+    rpg_->setTransmitting(true);
+    rpgInitialized_ = true;
+  }
+
+  std::shared_ptr<Configure> getConfiguration() {
+    return configure_;
+  }
+
+  std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+    return no_op_repo_;
+  }
+
+  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+    return content_repo_;
+  }
+
+  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+    auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+    rpg_->onSchedule(processContext, sessionFactory);
+
+    auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+    session->add(ff);
+
+    rpg_->onTrigger(processContext, session);
+  }
+
+ protected:
+
+  bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
+    auto functions = updateController->getFunctions();
+    // run all functions independently
+
+    for (auto function : functions) {
+      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
+      utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
+      std::future<state::Update> future;
+      if (!listener_thread_pool_.execute(std::move(functor), future)) {
+        // denote failure
+        return false;
+      }
+    }
+    return true;
+  }
+
+  std::shared_ptr<c2::C2CallbackAgent> agent_;
+
+  std::atomic<bool> running_;
+
+  bool rpgInitialized_;
+
+  std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+  std::shared_ptr<core::ProcessorNode> proc_node_;
+  std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+  std::shared_ptr<io::StreamFactory> stream_factory_;
+  std::string url_;
+  std::shared_ptr<Configure> configure_;
+
+  utils::ThreadPool<state::Update> listener_thread_pool_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Plan.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
new file mode 100644
index 0000000..6171242
--- /dev/null
+++ b/nanofi/include/cxx/Plan.h
@@ -0,0 +1,204 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+
+#ifndef WIN32
+	#include <dirent.h>
+#endif
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "core/cstructs.h"
+#include "api/nanofi.h"
+
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+  void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    auto ff = session->get();
+    if (ff == nullptr) {
+      return;
+    }
+
+    auto claim = ff->getResourceClaim();
+
+    if (claim != nullptr && user_callback != nullptr) {
+      claim->increaseFlowFileRecordOwnedCount();
+      // create a flow file.
+      auto path = claim->getContentFullPath();
+      auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+      ffr->attributes = ff->getAttributesPtr();
+      ffr->ffp = ff.get();
+      auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+      *content_repo_ptr = cr_ptr;
+      user_callback(ffr);
+    }
+    session->remove(ff);
+  }
+
+  void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+    session->rollback();
+    failureStrategyAsIs(session, user_callback, cr_ptr);
+  }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+    { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
+
+class ExecutionPlan {
+ public:
+
+  explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+  std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+                                                core::Relationship relationship = core::Relationship("success", "description"),
+                                                bool linkToPrevious = false);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+  bool linkToPrevious = false);
+
+  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+  void reset();
+
+  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+  bool setFailureCallback(failure_callback_type onerror_callback);
+
+  bool setFailureStrategy(FailureStrategy start);
+
+  std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+  std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+  std::shared_ptr<core::ProcessSession> getCurrentSession();
+
+  std::shared_ptr<core::Repository> getFlowRepo() {
+    return flow_repo_;
+  }
+
+  std::shared_ptr<core::Repository> getProvenanceRepo() {
+    return prov_repo_;
+  }
+
+  std::shared_ptr<core::ContentRepository> getContentRepo() {
+    return content_repo_;
+  }
+
+  std::shared_ptr<core::FlowFile> getNextFlowFile(){
+    return next_ff_;
+  }
+
+  void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
+    next_ff_ = ptr;
+  }
+
+  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
+ protected:
+  class FailureHandler {
+   public:
+    FailureHandler(content_repo_sptr cr_ptr) {
+      callback_ = nullptr;
+      strategy_ = FailureStrategy::AS_IS;
+      content_repo_ = cr_ptr;
+    }
+    void setCallback(failure_callback_type onerror_callback) {
+      callback_=onerror_callback;
+    }
+    void setStrategy(FailureStrategy strat) {
+      strategy_ = strat;
+    }
+    void operator()(const processor_session* ps) {
+      auto ses = static_cast<core::ProcessSession*>(ps->session);
+      FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
+    }
+   private:
+    failure_callback_type callback_;
+    FailureStrategy strategy_;
+    content_repo_sptr content_repo_;
+  };
+
+  void finalize();
+
+  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+  std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+                                                        core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
+
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+  content_repo_sptr content_repo_;
+
+  std::shared_ptr<core::Repository> flow_repo_;
+  std::shared_ptr<core::Repository> prov_repo_;
+
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+  std::atomic<bool> finalized;
+
+  uint32_t location;
+
+  std::shared_ptr<core::ProcessSession> current_session_;
+  std::shared_ptr<core::FlowFile> current_flowfile_;
+
+  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+  core::Relationship termination_;
+
+  std::shared_ptr<core::FlowFile> next_ff_;
+
+ private:
+
+  static std::shared_ptr<utils::IdGenerator> id_generator_;
+  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<FailureHandler> failure_handler_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/nanofi/include/cxx/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+  // Constructor
+  /*!
+   * Create a new process session
+   */
+  ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+      : ProcessSession(processContext){
+  }
+
+// Destructor
+  virtual ~ReflexiveSession() {
+  }
+
+   virtual std::shared_ptr<core::FlowFile> get(){
+     auto prevff = ff;
+     ff = nullptr;
+     return prevff;
+   }
+
+   virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+     ff = flow;
+   }
+   virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+     // no op
+   }
+ protected:
+  //
+  // Get the FlowFile from the highest priority queue
+  std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif