You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/31 18:28:38 UTC
[2/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into
nanofi
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
deleted file mode 100644
index e135fe1..0000000
--- a/libminifi/src/capi/api.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <string>
-#include <map>
-#include <memory>
-#include <utility>
-#include <exception>
-#include "core/Core.h"
-#include "capi/api.h"
-#include "capi/expect.h"
-#include "capi/Instance.h"
-#include "capi/Plan.h"
-#include "ResourceClaim.h"
-#include "processors/GetFile.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/StringUtils.h"
-
-using string_map = std::map<std::string, std::string>;
-
-class API_INITIALIZER {
- public:
- static int initialized;
-};
-
-int API_INITIALIZER::initialized = initialize_api();
-
-int initialize_api() {
- logging::LoggerConfiguration::getConfiguration().disableLogging();
- return 1;
-}
-
-void enable_logging() {
- logging::LoggerConfiguration::getConfiguration().enableLogging();
-}
-
-void set_terminate_callback(void (*terminate_callback)()) {
- std::set_terminate(terminate_callback);
-}
-
-class DirectoryConfiguration {
- protected:
- DirectoryConfiguration() {
- minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
- }
- public:
- static void initialize() {
- static DirectoryConfiguration configure;
- }
-};
-
-/**
- * Creates a NiFi Instance from the url and output port.
- * @param url http URL for NiFi instance
- * @param port Remote output port.
- * @Deprecated for API version 0.2 in favor of the following prototype
- * nifi_instance *create_instance(nifi_port const *port) {
- */
-nifi_instance *create_instance(const char *url, nifi_port *port) {
- // make sure that we have a thread safe way of initializing the content directory
- DirectoryConfiguration::initialize();
-
- // need reinterpret cast until we move to C for this module.
- nifi_instance *instance = reinterpret_cast<nifi_instance*>( malloc(sizeof(nifi_instance)) );
- /**
- * This API will gradually move away from C++, hence malloc is used for nifi_instance
- * Since minifi::Instance is currently being used, then we need to use new in that case.
- */
- instance->instance_ptr = new minifi::Instance(url, port->port_id);
- // may have to translate port ID here in the future
- // need reinterpret cast until we move to C for this module.
- instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
- snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
- return instance;
-}
-
-/**
- * Initializes the instance
- */
-void initialize_instance(nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->setRemotePort(instance->port.port_id);
-}
-/*
- typedef int c2_update_callback(char *);
-
- typedef int c2_stop_callback(char *);
-
- typedef int c2_start_callback(char *);
-
- */
-void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
-}
-
-/**
- * Sets a property within the nifi instance
- * @param instance nifi instance
- * @param key key in which we will set the valiue
- * @param value
- * @return -1 when instance or key are null
- */
-int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
- if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
- return -1;
- }
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->getConfiguration()->set(key, value);
- return 0;
-}
-
-/**
- * Reclaims memory associated with a nifi instance structure.
- * @param instance nifi instance.
- */
-void free_instance(nifi_instance* instance) {
- if (instance != nullptr) {
- delete ((minifi::Instance*) instance->instance_ptr);
- free(instance->port.port_id);
- free(instance);
- }
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_flowfile(const char *file, const size_t len) {
- flow_file_record *new_ff = new flow_file_record;
- new_ff->attributes = new string_map();
- new_ff->contentLocation = new char[len + 1];
- snprintf(new_ff->contentLocation, len + 1, "%s", file);
- std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
- // set the size of the flow file.
- new_ff->size = in.tellg();
- return new_ff;
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
- if (nullptr == file) {
- return nullptr;
- }
- flow_file_record *new_ff = create_ff_object_na(file, len, size);
- new_ff->attributes = new string_map();
- new_ff->ffp = 0;
- return new_ff;
-}
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
- flow_file_record *new_ff = new flow_file_record;
- new_ff->attributes = nullptr;
- new_ff->contentLocation = new char[len + 1];
- snprintf(new_ff->contentLocation, len + 1, "%s", file);
- // set the size of the flow file.
- new_ff->size = size;
- new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
- return new_ff;
-}
-/**
- * Reclaims memory associated with a flow file object
- * @param ff flow file record.
- */
-void free_flowfile(flow_file_record *ff) {
- if (ff == nullptr) {
- return;
- }
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
- if (content_repo_ptr->get()) {
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
- (*content_repo_ptr)->remove(claim);
- }
- if (ff->ffp == nullptr) {
- auto map = static_cast<string_map*>(ff->attributes);
- delete map;
- }
- delete[] ff->contentLocation;
- delete ff;
- delete content_repo_ptr;
-}
-
-/**
- * Adds an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
- */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
- return ret.second ? 0 : -1;
-}
-
-/**
- * Updates (or adds) an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- */
-void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
-}
-
-/*
- * Obtains the attribute.
- * @param ff flow file record
- * @param key key
- * @param caller_attribute caller supplied object in which we will copy the data ptr
- * @return 0 if successful, -1 if the key does not exist
- */
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) {
- if (ff == nullptr) {
- return -1;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- if (!attribute_map) {
- return -1;
- }
- auto find = attribute_map->find(caller_attribute->key);
- if (find != attribute_map->end()) {
- caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
- caller_attribute->value_size = find->second.size();
- return 0;
- }
- return -1;
-}
-
-int get_attribute_qty(const flow_file_record* ff) {
- if (ff == nullptr) {
- return 0;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- return attribute_map ? attribute_map->size() : 0;
-}
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
- if (ff == nullptr) {
- return 0;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- if (!attribute_map || attribute_map->empty()) {
- return 0;
- }
- int i = 0;
- for (const auto& kv : *attribute_map) {
- if (i >= target->size) {
- break;
- }
- target->attributes[i].key = kv.first.data();
- target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
- target->attributes[i].value_size = kv.second.size();
- ++i;
- }
- return i;
-}
-
-/**
- * Removes a key from the attribute chain
- * @param ff flow file record
- * @param key key to remove
- * @return 0 if removed, -1 otherwise
- */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1)
-}
-
-/**
- * Transmits the flowfile
- * @param ff flow file record
- * @param instance nifi instance structure
- */
-int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
- if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
- minifi_instance_ref->setRemotePort(instance->port.port_id);
- }
-
- auto attribute_map = static_cast<string_map*>(ff->attributes);
-
- auto no_op = minifi_instance_ref->getNoOpRepository();
-
- auto content_repo = minifi_instance_ref->getContentRepository();
-
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
- claim->increaseFlowFileRecordOwnedCount();
- claim->increaseFlowFileRecordOwnedCount();
-
- auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
- ffr->addAttribute("nanofi.version", API_VERSION);
- ffr->setSize(ff->size);
-
- std::string port_uuid = instance->port.port_id;
-
- minifi_instance_ref->transfer(ffr);
-
- return 0;
-}
-
-flow *create_new_flow(nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- flow *new_flow = new flow;
-
- auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
- new_flow->plan = execution_plan;
-
- return new_flow;
-}
-
-flow *create_flow(nifi_instance *instance, const char *first_processor) {
- if (nullptr == instance || nullptr == instance->instance_ptr) {
- return nullptr;
- }
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- flow *new_flow = new flow;
-
- auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
- new_flow->plan = execution_plan;
-
- if (first_processor != nullptr && strlen(first_processor) > 0) {
- // automatically adds it with success
- execution_plan->addProcessor(first_processor, first_processor);
- }
- return new_flow;
-}
-
-processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
- if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
- return nullptr;
- }
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
-}
-
-flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) {
- static const std::string first_processor = "GetFile";
- flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
-
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
- // automatically adds it with success
- auto getFile = plan->addProcessor(first_processor, first_processor);
-
- plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
- plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
- plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
-
- return new_flow;
-}
-
-processor *add_processor(flow *flow, const char *processor_name) {
- if (nullptr == flow || nullptr == processor_name) {
- return nullptr;
- }
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addProcessor(processor_name, processor_name);
- if (proc) {
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
- }
- return nullptr;
-}
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
- if (proc) {
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
- }
- return nullptr;
-}
-
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- return plan->setFailureCallback(onerror_callback) ? 0 : 1;
-}
-
-int set_failure_strategy(flow *flow, FailureStrategy strategy) {
- return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
-}
-
-int set_property(processor *proc, const char *name, const char *value) {
- if (name != nullptr && value != nullptr && proc != nullptr) {
- core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
- bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
- return success ? 0 : -2;
- }
- return -1;
-}
-
-int free_flow(flow *flow) {
- if (flow == nullptr || nullptr == flow->plan)
- return -1;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- delete execution_plan;
- delete flow;
- return 0;
-}
-
-flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
- if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
- return nullptr;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- execution_plan->reset();
- while (execution_plan->runNextProcessor()) {
- }
- auto ff = execution_plan->getCurrentFlowFile();
- if (ff == nullptr) {
- return nullptr;
- }
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr) {
- // create a flow file.
- claim->increaseFlowFileRecordOwnedCount();
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->ffp = ff.get();
- ffr->attributes = ff->getAttributesPtr();
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
- *content_repo_ptr = execution_plan->getContentRepo();
- return ffr;
- } else {
- return nullptr;
- }
-}
-
-size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
- if (nullptr == instance || nullptr == flow || nullptr == ff_r)
- return 0;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- size_t i = 0;
- for (; i < size; i++) {
- execution_plan->reset();
- auto ffr = get_next_flow_file(instance, flow);
- if (ffr == nullptr) {
- break;
- }
- ff_r[i] = ffr;
- }
- return i;
-}
-
-flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) {
- if (nullptr == instance || nullptr == flow || nullptr == session)
- return nullptr;
- auto sesh = static_cast<core::ProcessSession*>(session->session);
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- auto ff = sesh->get();
- execution_plan->setNextFlowFile(ff);
- if (ff == nullptr) {
- return nullptr;
- }
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr) {
- // create a flow file.
- claim->increaseFlowFileRecordOwnedCount();
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->attributes = ff->getAttributesPtr();
- ffr->ffp = ff.get();
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
- *content_repo_ptr = execution_plan->getContentRepo();
- return ffr;
- } else {
- return nullptr;
- }
-}
-
-int transfer(processor_session* session, flow *flow, const char *rel) {
- if (nullptr == session || nullptr == flow || rel == nullptr) {
- return -1;
- }
- auto sesh = static_cast<core::ProcessSession*>(session->session);
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- if (nullptr == sesh || nullptr == execution_plan) {
- return -1;
- }
- core::Relationship relationship(rel, rel);
- auto ff = execution_plan->getNextFlowFile();
- if (nullptr == ff) {
- return -2;
- }
- sesh->transfer(ff, relationship);
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 47c7ba6..674566b 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -17,7 +17,7 @@
*/
#include "core/repository/VolatileContentRepository.h"
-#include "capi/expect.h"
+#include "core/expect.h"
#include <cstdio>
#include <string>
#include <memory>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/processors/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp
deleted file mode 100644
index 5524d92..0000000
--- a/libminifi/src/processors/CallbackProcessor.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/CallbackProcessor.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- if (callback_ != nullptr) {
- processor_session sesh;
- sesh.session = session;
- callback_(&sesh);
- }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
deleted file mode 100644
index b7bf784..0000000
--- a/libminifi/test/capi/CAPITests.cpp
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <sys/stat.h>
-#include <utility>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <fstream>
-
-#include "utils/file/FileUtils.h"
-#include "../TestBase.h"
-
-#include "capi/api.h"
-
-#include <chrono>
-#include <thread>
-
-static nifi_instance *create_instance_obj(const char *name = "random_instance") {
- nifi_port port;
- char port_str[] = "12345";
- port.port_id = port_str;
- return create_instance("random_instance", &port);
-}
-
-static int failure_count = 0;
-
-void failure_counter(flow_file_record * fr) {
- failure_count++;
- REQUIRE(get_attribute_qty(fr) > 0);
- free_flowfile(fr);
-}
-
-void big_failure_counter(flow_file_record * fr) {
- failure_count += 100;
- free_flowfile(fr);
-}
-
-TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
- REQUIRE(test_proc != nullptr);
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- processor *test_proc = add_processor(test_flow, "NeverExisted");
- REQUIRE(test_proc == nullptr);
- processor *no_proc = add_processor(test_flow, "");
- REQUIRE(no_proc == nullptr);
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
- REQUIRE(test_proc != nullptr);
- REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value
- // TODO(aboda): add this two below when property handling is made strictly typed
- // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
- // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
- REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value
- REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute
- REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("get file and put file", "[getAndPutFile]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- char put_format[] = "/tmp/pt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
- const char *putfiledir = testController.createTempDirectory(put_format);
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
- REQUIRE(put_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
- flow_file_record *record = get_next_flow_file(instance, test_flow);
- REQUIRE(record != nullptr);
-
- ss.str("");
-
- ss << putfiledir << "/" << "tstFile.ext";
- std::ifstream t(ss.str());
- std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
-
- REQUIRE(test_file_content == put_data);
-
- // No failure handler can be added after the flow is finalized
- REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
-
- free_flowfile(record);
-
- free_flow(test_flow);
-
- free_instance(instance);
-}
-
-TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
-
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
- std::fstream file;
- std::stringstream ss;
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
-
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
- REQUIRE(extract_test != nullptr);
- REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
- processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
- REQUIRE(update_attr != nullptr);
-
- REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
-
- flow_file_record *record = get_next_flow_file(instance, test_flow);
-
- REQUIRE(record != nullptr);
-
- attribute test_attr;
- test_attr.key = "TestAttr";
- REQUIRE(get_attribute(record, &test_attr) == 0);
-
- REQUIRE(test_attr.value_size != 0);
- REQUIRE(test_attr.value != nullptr);
-
- std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
-
- REQUIRE(attr_value == test_file_content);
-
- const char * new_testattr_value = "S0me t3st t3xt";
-
- // Attribute already exist, should fail
- REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT
-
- // Update overwrites values
- update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT
-
- int attr_size = get_attribute_qty(record);
- REQUIRE(attr_size > 0);
-
- attribute_set attr_set;
- attr_set.size = attr_size;
- attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT
-
- REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
-
- bool test_attr_found = false;
- bool updated_attr_found = false;
- for (int i = 0; i < attr_set.size; ++i) {
- if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
- test_attr_found = true;
- REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
- } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
- updated_attr_found = true;
- REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
- }
- }
- REQUIRE(updated_attr_found == true);
- REQUIRE(test_attr_found == true);
-
- free_flowfile(record);
-
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Test error handling callback", "[errorHandling]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
-
- // Failure strategy cannot be set before a valid callback is added
- REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
- REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
-
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
- REQUIRE(put_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
- REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
-
- std::fstream file;
- std::stringstream ss;
-
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
-
- REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-
- REQUIRE(failure_count == 1);
-
- // Failure handler function can be replaced runtime
- REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
- REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
-
- // Create new testfile to trigger failure again
- ss << "2";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
- REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
- REQUIRE(failure_count > 100);
-
- failure_count = 0;
-
- free_flow(test_flow);
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
new file mode 100644
index 0000000..bd2d952
--- /dev/null
+++ b/nanofi/CMakeLists.txt
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(include)
+include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+
+if(WIN32)
+include_directories(../libminifi/opsys/win)
+else()
+include_directories(../libminifi/opsys/posix)
+endif()
+
+file(GLOB NANOFI_SOURCES "src/api/*.cpp" "src/cxx/*.cpp" )
+
+file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" )
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+ if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+ CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+ if (_cpp_latest_flag_supported)
+ add_compile_options("/std:c++14")
+ endif()
+ endif()
+else()
+
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+add_library(nanofi STATIC ${NANOFI_SOURCES})
+
+if (APPLE)
+ target_link_libraries (nanofi -Wl,-all_load core-minifi minifi)
+elseif(NOT WIN32)
+ target_link_libraries (nanofi -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive)
+else()
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi")
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi")
+endif ()
+
+if(WIN32)
+ set_target_properties(nanofi PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+if (ENABLE_PYTHON)
+
+add_library(nanofi-shared SHARED ${NANOFI_SOURCES})
+
+if (APPLE)
+ target_link_libraries (nanofi-shared -Wl,-all_load core-minifi-shared minifi-shared)
+elseif(NOT WIN32)
+ target_link_libraries (nanofi-shared -Wl,--whole-archive core-minifi-shared minifi-shared -Wl,--no-whole-archive)
+else()
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi-shared")
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi-shared")
+endif ()
+
+
+if(WIN32)
+ set_target_properties(nanofi-shared PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+set_property(TARGET nanofi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+endif(ENABLE_PYTHON)
+
+if (NOT DISABLE_CURL)
+add_subdirectory(examples)
+endif()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
new file mode 100644
index 0000000..f5f660f
--- /dev/null
+++ b/nanofi/examples/CMakeLists.txt
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+ if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+ CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+ if (_cpp_latest_flag_supported)
+ add_compile_options("/std:c++14")
+ endif()
+ endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (WIN32)
+ set(LINK_FLAGS "/WHOLEARCHIVE")
+ set(LINK_END_FLAGS "")
+elseif (APPLE)
+ set(LINK_FLAGS "-Wl,-all_load")
+ set(LINK_END_FLAGS "")
+else ()
+ set(LINK_FLAGS "-Wl,--whole-archive")
+ set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
+add_executable(generate_flow generate_flow.c)
+
+add_executable(terminate_handler terminate_handler.c)
+
+target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
+if (NOT WIN32)
+
+add_executable(transmit_flow transmit_flow.c)
+
+target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(monitor_directory monitor_directory.c)
+
+target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/generate_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/generate_flow.c b/nanofi/examples/generate_flow.c
new file mode 100644
index 0000000..707de11
--- /dev/null
+++ b/nanofi/examples/generate_flow.c
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 3) {
+ printf("Error: must run ./generate_flow <instance> <remote port> \n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+ flow_file_record *record = get_next_flow_file(instance, new_flow);
+
+ if (record == 0) {
+ printf("Could not create flow file");
+ exit(1);
+ }
+
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ //transfer_file_or_directory(instance,file);
+
+ free_flow(new_flow);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/monitor_directory.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/monitor_directory.c b/nanofi/examples/monitor_directory.c
new file mode 100644
index 0000000..2283b35
--- /dev/null
+++ b/nanofi/examples/monitor_directory.c
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <pthread.h>
+
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
+int is_dir(const char *path) {
+ struct stat stat_struct;
+ if (stat(path, &stat_struct) != 0)
+ return 0;
+ return S_ISDIR(stat_struct.st_mode);
+}
+
+pthread_mutex_t mutex;
+pthread_cond_t condition;
+
+int stopped;
+
+int stop_callback(char *c) {
+ pthread_mutex_lock(&mutex);
+ stopped = 1;
+ pthread_cond_signal(&condition);
+ pthread_mutex_unlock(&mutex);
+ return 0;
+}
+
+int is_stopped(void *ptr) {
+ int is_stop = 0;
+ pthread_mutex_lock(&mutex);
+ is_stop = stopped;
+ pthread_mutex_unlock(&mutex);
+ return is_stop;
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+ if (argc < 5) {
+ printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
+ exit(1);
+ }
+
+ stopped = 0x00;
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+ char *directory = argv[3];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ C2_Server server;
+ server.url = argv[4];
+ server.ack_url = argv[5];
+ server.identifier = "monitor_directory";
+ server.type = REST;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
+
+ flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
+
+ transmit_to_nifi(instance, new_flow, is_stopped);
+
+ free_flow(new_flow);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/terminate_handler.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c
new file mode 100644
index 0000000..d5443f0
--- /dev/null
+++ b/nanofi/examples/terminate_handler.c
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that registers terminate handler and generates an exception.
+ */
+
+void example_terminate_handler() {
+ fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
+ exit(0);
+}
+
+int main(int argc, char **argv) {
+
+ nifi_port port;
+
+ port.port_id = "12345";
+
+ set_terminate_callback(example_terminate_handler);
+
+ nifi_instance *instance = create_instance("random instance", &port);
+
+ flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+ processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
+
+ // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
+ // As it doesn't exist, an exception is thrown
+ set_property(put_proc, "Directory", "/tmp/never_existed");
+ set_property(put_proc, "Create Missing Directories", "false");
+
+ flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+ // Here be dragons - nothing below this line gets executed
+ fprintf(stderr, "Dragons!!!");
+ free_flowfile(record);
+ free_flow(new_flow);
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/transmit_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
new file mode 100644
index 0000000..e132c8b
--- /dev/null
+++ b/nanofi/examples/transmit_flow.c
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "api/nanofi.h"
+
+int is_dir(const char *path) {
+ struct stat stat_struct;
+ if (stat(path, &stat_struct) != 0)
+ return 0;
+ return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+ int size = 1;
+
+ if (is_dir(file_or_dir)) {
+ DIR *d;
+
+ struct dirent *dir;
+ d = opendir(file_or_dir);
+ if (d) {
+ while ((dir = readdir(d)) != NULL) {
+ if (!memcmp(dir->d_name,".",1) )
+ continue;
+ char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+ sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+ transfer_file_or_directory(instance,file_path);
+ free(file_path);
+ }
+ closedir(d);
+ }
+ printf("%s is a directory", file_or_dir);
+ } else {
+ printf("Transferring %s\n",file_or_dir);
+
+ flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
+
+ add_attribute(record, "addedattribute", "1", 2);
+
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+ }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 4) {
+ printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+ char *file = argv[3];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ transfer_file_or_directory(instance,file);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/api/nanofi.h
----------------------------------------------------------------------
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
new file mode 100644
index 0000000..31a4829
--- /dev/null
+++ b/nanofi/include/api/nanofi.h
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include "core/cstructs.h"
+#include "core/processors.h"
+
+int initialize_api();
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Updates with every release. Functions used here constitute the public API of NanoFi.
+ *
+ * Changes here will follow semver
+ */
+#define API_VERSION "0.02"
+
+void enable_logging();
+
+void set_terminate_callback(void (*terminate_callback)());
+
+/****
+ * ##################################################################
+ * BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+nifi_instance *create_instance(const char *url, nifi_port *port);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+/****
+ * ##################################################################
+ * C2 OPERATIONS
+ * ##################################################################
+ */
+
+
+typedef int c2_update_callback(char *);
+
+typedef int c2_stop_callback(char *);
+
+typedef int c2_start_callback(char *);
+
+void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
+
+
+uint8_t run_processor(const processor *processor);
+
+flow *create_new_flow(nifi_instance *);
+
+flow *create_flow(nifi_instance *, const char *);
+
+flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+
+processor *add_processor(flow *, const char *);
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name);
+
+processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file ownership is transferred to the caller!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
+
+int set_property(processor *, const char *, const char *);
+
+int set_instance_property(nifi_instance *instance, const char*, const char *);
+
+int free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
+
+flow_file_record *get(nifi_instance *,flow *, processor_session *);
+
+int transfer(processor_session* session, flow *flow, const char *rel);
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len);
+
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
+
+int get_attribute_qty(const flow_file_record* ff);
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ * Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+int transmit_flowfile(flow_file_record *, nifi_instance *);
+
+/****
+ * ##################################################################
+ * Persistence Operations
+ * ##################################################################
+ */
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/comms.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/comms.h b/nanofi/include/blocks/comms.h
new file mode 100644
index 0000000..bfacc3d
--- /dev/null
+++ b/nanofi/include/blocks/comms.h
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_COMMS_H_
+#define BLOCKS_COMMS_H_
+
+#include <stdio.h>
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define SUCCESS 0x00
+#define FINISHED_EARLY 0x01
+#define FAIL 0x02
+
+typedef int transmission_stop(void *);
+
+uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
+
+ flow_file_record *record = 0x00;
+ do {
+ record = get_next_flow_file(instance, flow);
+
+ if (record == 0) {
+ return FINISHED_EARLY;
+ }
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+ } while (record != 0x00 && !(stop_callback != 0x00 && stop_callback(0x00)));
+ return SUCCESS;
+}
+
+#endif /* BLOCKS_COMMS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/file_blocks.h b/nanofi/include/blocks/file_blocks.h
new file mode 100644
index 0000000..d84ccbe
--- /dev/null
+++ b/nanofi/include/blocks/file_blocks.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_FILE_BLOCKS_H_
+#define BLOCKS_FILE_BLOCKS_H_
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define KEEP_SOURCE 0x01
+#define RECURSE 0x02
+
+/**
+ * Monitor directory can be combined into a current flow. to create an execution plan
+ */
+flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
+ GetFileConfig config;
+ config.directory = directory;
+ config.keep_source = flags & KEEP_SOURCE;
+ config.recurse = flags & RECURSE;
+ return create_getfile(instance, parent_flow, &config);
+}
+
+#endif /* BLOCKS_FILE_BLOCKS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/cstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
new file mode 100644
index 0000000..bc9700e
--- /dev/null
+++ b/nanofi/include/core/cstructs.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+ char *port_id;
+} nifi_port;
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+ void *instance_ptr;
+
+ nifi_port port;
+
+} nifi_instance;
+
+/****
+ * ##################################################################
+ * C2 OPERATIONS
+ * ##################################################################
+ */
+
+enum C2_Server_Type {
+ REST,
+ MQTT
+};
+
+typedef struct {
+ char *url;
+ char *ack_url;
+ char *identifier;
+ char *topic;
+ enum C2_Server_Type type;
+} C2_Server;
+
+/****
+ * ##################################################################
+ * Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+ void *processor_ptr;
+} processor;
+
+typedef struct {
+ void *session;
+} processor_session;
+
+/****
+ * ##################################################################
+ * FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+ const char *key;
+ void *value;
+ size_t value_size;
+} attribute;
+
+typedef struct {
+ attribute * attributes;
+ size_t size;
+} attribute_set;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+ uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+ void * in;
+
+ void * crp;
+
+ char * contentLocation; /**< Filesystem location of this object */
+
+ void *attributes; /**< Hash map of attributes */
+
+ void *ffp;
+
+} flow_file_record;
+
+typedef struct {
+ void *plan;
+} flow;
+
+typedef enum FS {
+ AS_IS,
+ ROLLBACK
+} FailureStrategy;
+
+#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/processors.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/processors.h b/nanofi/include/core/processors.h
new file mode 100644
index 0000000..7fe357d
--- /dev/null
+++ b/nanofi/include/core/processors.h
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ char *directory;
+ unsigned keep_source :1;
+ unsigned recurse :1;
+} GetFileConfig;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h
new file mode 100644
index 0000000..f620baa
--- /dev/null
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "c2/C2Agent.h"
+#include "core/state/Value.h"
+#include "c2/C2Payload.h"
+#include "c2/C2Protocol.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+typedef int c2_ag_update_callback(char *);
+
+typedef int c2_ag_stop_callback(char *);
+
+typedef int c2_ag_start_callback(char *);
+
+class C2CallbackAgent : public c2::C2Agent {
+
+ public:
+
+ explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+ virtual ~C2CallbackAgent() {
+ }
+
+ void setStopCallback(c2_ag_stop_callback *st){
+ stop = st;
+ }
+
+
+ protected:
+ /**
+ * Handles a C2 event requested by the server.
+ * @param resp c2 server response.
+ */
+ virtual void handle_c2_server_response(const C2ContentResponse &resp);
+
+ c2_ag_stop_callback *stop;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h
new file mode 100644
index 0000000..61e824f
--- /dev/null
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -0,0 +1,100 @@
+/**
+ * @file CallbackProcessor.h
+ * CallbackProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CALLBACK_PROCESSOR_H__
+#define __CALLBACK_PROCESSOR_H__
+
+#include <stdio.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <iostream>
+#include <sys/types.h>
+#include "core/cstructs.h"
+#include "io/BaseStream.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// CallbackProcessor Class
+class CallbackProcessor : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
+ : Processor(name, uuid),
+ callback_(nullptr),
+ objref_(nullptr),
+ logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
+ }
+ // Destructor
+ virtual ~CallbackProcessor() {
+
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "CallbackProcessor";
+
+ public:
+
+ void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
+ objref_ = obj;
+ callback_ = ontrigger_callback;
+ }
+
+ // OnTrigger method, implemented by NiFi CallbackProcessor
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ // Initialize, over write by NiFi CallbackProcessor
+ virtual void initialize() {
+ std::set<core::Relationship> relationships;
+ core::Relationship Success("success", "description");
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+ }
+
+ protected:
+ void *objref_;
+ std::function<void(processor_session*)> callback_;
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+REGISTER_RESOURCE(CallbackProcessor);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Instance.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
new file mode 100644
index 0000000..982f6d4
--- /dev/null
+++ b/nanofi/include/cxx/Instance.h
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "C2CallbackAgent.h"
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+#include "utils/ThreadPool.h"
+#include "core/state/UpdateController.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+ explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+ : processor_(processor) {
+
+ }
+
+ const std::shared_ptr<core::Processor> &getProcessor() {
+ return processor_;
+ }
+
+ protected:
+ std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+ explicit Instance(const std::string &url, const std::string &port)
+ : configure_(std::make_shared<Configure>()),
+ url_(url),
+ agent_(nullptr),
+ rpgInitialized_(false),
+ listener_thread_pool_(1),
+ content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+ no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+ running_ = false;
+ stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
+ utils::Identifier uuid;
+ uuid = port;
+ rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+ proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+ core::FlowConfiguration::initialize_static_functions();
+ content_repo_->initialize(configure_);
+ }
+
+ ~Instance() {
+ running_ = false;
+ listener_thread_pool_.shutdown();
+ }
+
+ bool isRPGConfigured() {
+ return rpgInitialized_;
+ }
+
+ void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+ running_ = true;
+ if (server->type != C2_Server_Type::MQTT){
+ configure_->set("c2.rest.url",server->url);
+ configure_->set("c2.rest.url.ack",server->ack_url);
+ }
+ agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
+ listener_thread_pool_.start();
+ registerUpdateListener(agent_, 1000);
+ agent_->setStopCallback(c1);
+ }
+
+ void setRemotePort(std::string remote_port) {
+ rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+ rpg_->initialize();
+ rpg_->setTransmitting(true);
+ rpgInitialized_ = true;
+ }
+
+ std::shared_ptr<Configure> getConfiguration() {
+ return configure_;
+ }
+
+ std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+ return no_op_repo_;
+ }
+
+ std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+ return content_repo_;
+ }
+
+ void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+ auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+ auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+ rpg_->onSchedule(processContext, sessionFactory);
+
+ auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+ session->add(ff);
+
+ rpg_->onTrigger(processContext, session);
+ }
+
+ protected:
+
+ bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
+ auto functions = updateController->getFunctions();
+ // run all functions independently
+
+ for (auto function : functions) {
+ std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
+ utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
+ std::future<state::Update> future;
+ if (!listener_thread_pool_.execute(std::move(functor), future)) {
+ // denote failure
+ return false;
+ }
+ }
+ return true;
+ }
+
+ std::shared_ptr<c2::C2CallbackAgent> agent_;
+
+ std::atomic<bool> running_;
+
+ bool rpgInitialized_;
+
+ std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+ std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+ std::shared_ptr<core::ProcessorNode> proc_node_;
+ std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+ std::shared_ptr<io::StreamFactory> stream_factory_;
+ std::string url_;
+ std::shared_ptr<Configure> configure_;
+
+ utils::ThreadPool<state::Update> listener_thread_pool_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Plan.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
new file mode 100644
index 0000000..6171242
--- /dev/null
+++ b/nanofi/include/cxx/Plan.h
@@ -0,0 +1,204 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+
+#ifndef WIN32
+ #include <dirent.h>
+#endif
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "core/cstructs.h"
+#include "api/nanofi.h"
+
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+ void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ auto ff = session->get();
+ if (ff == nullptr) {
+ return;
+ }
+
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr && user_callback != nullptr) {
+ claim->increaseFlowFileRecordOwnedCount();
+ // create a flow file.
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->attributes = ff->getAttributesPtr();
+ ffr->ffp = ff.get();
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = cr_ptr;
+ user_callback(ffr);
+ }
+ session->remove(ff);
+ }
+
+ void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ session->rollback();
+ failureStrategyAsIs(session, user_callback, cr_ptr);
+ }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+ { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
+
+class ExecutionPlan {
+ public:
+
+ explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+ std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+ core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+ void reset();
+
+ bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+ bool setFailureCallback(failure_callback_type onerror_callback);
+
+ bool setFailureStrategy(FailureStrategy start);
+
+ std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+ std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+ std::shared_ptr<core::ProcessSession> getCurrentSession();
+
+ std::shared_ptr<core::Repository> getFlowRepo() {
+ return flow_repo_;
+ }
+
+ std::shared_ptr<core::Repository> getProvenanceRepo() {
+ return prov_repo_;
+ }
+
+ std::shared_ptr<core::ContentRepository> getContentRepo() {
+ return content_repo_;
+ }
+
+ std::shared_ptr<core::FlowFile> getNextFlowFile(){
+ return next_ff_;
+ }
+
+ void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
+ next_ff_ = ptr;
+ }
+
+ static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
+ protected:
+ class FailureHandler {
+ public:
+ FailureHandler(content_repo_sptr cr_ptr) {
+ callback_ = nullptr;
+ strategy_ = FailureStrategy::AS_IS;
+ content_repo_ = cr_ptr;
+ }
+ void setCallback(failure_callback_type onerror_callback) {
+ callback_=onerror_callback;
+ }
+ void setStrategy(FailureStrategy strat) {
+ strategy_ = strat;
+ }
+ void operator()(const processor_session* ps) {
+ auto ses = static_cast<core::ProcessSession*>(ps->session);
+ FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
+ }
+ private:
+ failure_callback_type callback_;
+ FailureStrategy strategy_;
+ content_repo_sptr content_repo_;
+ };
+
+ void finalize();
+
+ std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+ std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+ core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
+
+ std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+ content_repo_sptr content_repo_;
+
+ std::shared_ptr<core::Repository> flow_repo_;
+ std::shared_ptr<core::Repository> prov_repo_;
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+ std::atomic<bool> finalized;
+
+ uint32_t location;
+
+ std::shared_ptr<core::ProcessSession> current_session_;
+ std::shared_ptr<core::FlowFile> current_flowfile_;
+
+ std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+ std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+ std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+ std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+ std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+ std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+ std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+ core::Relationship termination_;
+
+ std::shared_ptr<core::FlowFile> next_ff_;
+
+ private:
+
+ static std::shared_ptr<utils::IdGenerator> id_generator_;
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<FailureHandler> failure_handler_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/nanofi/include/cxx/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+ // Constructor
+ /*!
+ * Create a new process session
+ */
+ ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+ : ProcessSession(processContext){
+ }
+
+// Destructor
+ virtual ~ReflexiveSession() {
+ }
+
+ virtual std::shared_ptr<core::FlowFile> get(){
+ auto prevff = ff;
+ ff = nullptr;
+ return prevff;
+ }
+
+ virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+ ff = flow;
+ }
+ virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+ // no op
+ }
+ protected:
+ //
+ // Get the FlowFile from the highest priority queue
+ std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif