You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/31 18:28:37 UTC
[1/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into
nanofi
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master fc1074a04 -> 556794b15
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/api/nanofi.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
new file mode 100644
index 0000000..ee33c6b
--- /dev/null
+++ b/nanofi/src/api/nanofi.cpp
@@ -0,0 +1,518 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <map>
+#include <memory>
+#include <utility>
+#include <exception>
+
+#include "api/nanofi.h"
+#include "core/Core.h"
+#include "core/expect.h"
+#include "cxx/Instance.h"
+#include "cxx/Plan.h"
+#include "ResourceClaim.h"
+#include "processors/GetFile.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/StringUtils.h"
+
+using string_map = std::map<std::string, std::string>;
+
+class API_INITIALIZER {
+ public:
+ static int initialized;
+};
+
+int API_INITIALIZER::initialized = initialize_api();
+
+int initialize_api() {
+ logging::LoggerConfiguration::getConfiguration().disableLogging();
+ return 1;
+}
+
+void enable_logging() {
+ logging::LoggerConfiguration::getConfiguration().enableLogging();
+}
+
+void set_terminate_callback(void (*terminate_callback)()) {
+ std::set_terminate(terminate_callback);
+}
+
+class DirectoryConfiguration {
+ protected:
+ DirectoryConfiguration() {
+ minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+ }
+ public:
+ static void initialize() {
+ static DirectoryConfiguration configure;
+ }
+};
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ * @Deprecated for API version 0.2 in favor of the following prototype
+ * nifi_instance *create_instance(nifi_port const *port) {
+ */
+nifi_instance *create_instance(const char *url, nifi_port *port) {
+ // make sure that we have a thread safe way of initializing the content directory
+ DirectoryConfiguration::initialize();
+
+ // need reinterpret cast until we move to C for this module.
+ nifi_instance *instance = reinterpret_cast<nifi_instance*>(malloc(sizeof(nifi_instance)));
+ /**
+ * This API will gradually move away from C++, hence malloc is used for nifi_instance
+ * Since minifi::Instance is currently being used, then we need to use new in that case.
+ */
+ instance->instance_ptr = new minifi::Instance(url, port->port_id);
+ // may have to translate port ID here in the future
+ // need reinterpret cast until we move to C for this module.
+ instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
+ snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
+ return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ minifi_instance_ref->setRemotePort(instance->port.port_id);
+}
+/*
+ typedef int c2_update_callback(char *);
+
+ typedef int c2_stop_callback(char *);
+
+ typedef int c2_start_callback(char *);
+
+ */
+void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
+}
+
+/**
+ * Sets a property within the nifi instance
+ * @param instance nifi instance
+ * @param key key in which we will set the valiue
+ * @param value
+ * @return -1 when instance or key are null
+ */
+int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
+ if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
+ return -1;
+ }
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ minifi_instance_ref->getConfiguration()->set(key, value);
+ return 0;
+}
+
+/**
+ * Reclaims memory associated with a nifi instance structure.
+ * @param instance nifi instance.
+ */
+void free_instance(nifi_instance* instance) {
+ if (instance != nullptr) {
+ delete ((minifi::Instance*) instance->instance_ptr);
+ free(instance->port.port_id);
+ free(instance);
+ }
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len) {
+ flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record));
+ new_ff->attributes = new string_map();
+ new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+ snprintf(new_ff->contentLocation, len + 1, "%s", file);
+ std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+ // set the size of the flow file.
+ new_ff->size = in.tellg();
+ return new_ff;
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
+ if (nullptr == file) {
+ return nullptr;
+ }
+ flow_file_record *new_ff = create_ff_object_na(file, len, size);
+ new_ff->attributes = new string_map();
+ new_ff->ffp = 0;
+ return new_ff;
+}
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
+ flow_file_record *new_ff = new flow_file_record;
+ new_ff->attributes = nullptr;
+ new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+ snprintf(new_ff->contentLocation, len + 1, "%s", file);
+ // set the size of the flow file.
+ new_ff->size = size;
+ new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
+ return new_ff;
+}
+/**
+ * Reclaims memory associated with a flow file object
+ * @param ff flow file record.
+ */
+void free_flowfile(flow_file_record *ff) {
+ if (ff == nullptr) {
+ return;
+ }
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+ if (content_repo_ptr->get()) {
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+ (*content_repo_ptr)->remove(claim);
+ }
+ if (ff->ffp == nullptr) {
+ auto map = static_cast<string_map*>(ff->attributes);
+ delete map;
+ }
+ free(ff->contentLocation);
+ free(ff);
+ delete content_repo_ptr;
+}
+
+/**
+ * Adds an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
+ */
+uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
+ return ret.second ? 0 : -1;
+}
+
+/**
+ * Updates (or adds) an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ */
+void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
+}
+
+/*
+ * Obtains the attribute.
+ * @param ff flow file record
+ * @param key key
+ * @param caller_attribute caller supplied object in which we will copy the data ptr
+ * @return 0 if successful, -1 if the key does not exist
+ */
+uint8_t get_attribute(flow_file_record * ff, attribute * caller_attribute) {
+ if (ff == nullptr) {
+ return -1;
+ }
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ if (!attribute_map) {
+ return -1;
+ }
+ auto find = attribute_map->find(caller_attribute->key);
+ if (find != attribute_map->end()) {
+ caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
+ caller_attribute->value_size = find->second.size();
+ return 0;
+ }
+ return -1;
+}
+
+int get_attribute_qty(const flow_file_record* ff) {
+ if (ff == nullptr) {
+ return 0;
+ }
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ return attribute_map ? attribute_map->size() : 0;
+}
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
+ if (ff == nullptr) {
+ return 0;
+ }
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ if (!attribute_map || attribute_map->empty()) {
+ return 0;
+ }
+ int i = 0;
+ for (const auto& kv : *attribute_map) {
+ if (i >= target->size) {
+ break;
+ }
+ target->attributes[i].key = kv.first.data();
+ target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
+ target->attributes[i].value_size = kv.second.size();
+ ++i;
+ }
+ return i;
+}
+
+/**
+ * Removes a key from the attribute chain
+ * @param ff flow file record
+ * @param key key to remove
+ * @return 0 if removed, -1 otherwise
+ */
+uint8_t remove_attribute(flow_file_record *ff, const char *key) {
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+ return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1)
+}
+
+/**
+ * Transmits the flowfile
+ * @param ff flow file record
+ * @param instance nifi instance structure
+ */
+int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
+ if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
+ minifi_instance_ref->setRemotePort(instance->port.port_id);
+ }
+
+ auto attribute_map = static_cast<string_map*>(ff->attributes);
+
+ auto no_op = minifi_instance_ref->getNoOpRepository();
+
+ auto content_repo = minifi_instance_ref->getContentRepository();
+
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+ claim->increaseFlowFileRecordOwnedCount();
+ claim->increaseFlowFileRecordOwnedCount();
+
+ auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
+ ffr->addAttribute("nanofi.version", API_VERSION);
+ ffr->setSize(ff->size);
+
+ std::string port_uuid = instance->port.port_id;
+
+ minifi_instance_ref->transfer(ffr);
+
+ return 0;
+}
+
+flow * create_new_flow(nifi_instance * instance) {
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ flow *new_flow = (flow*) malloc(sizeof(flow));
+
+ auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+ new_flow->plan = execution_plan;
+
+ return new_flow;
+}
+
+flow *create_flow(nifi_instance *instance, const char *first_processor) {
+ if (nullptr == instance || nullptr == instance->instance_ptr) {
+ return nullptr;
+ }
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ flow *new_flow = (flow*) malloc(sizeof(flow));
+
+ auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
+
+ new_flow->plan = execution_plan;
+
+ if (first_processor != nullptr && strlen(first_processor) > 0) {
+ // automatically adds it with success
+ execution_plan->addProcessor(first_processor, first_processor);
+ }
+ return new_flow;
+}
+
+processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
+ if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
+ return nullptr;
+ }
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
+ processor *new_processor = (processor*) malloc(sizeof(processor));
+ new_processor->processor_ptr = proc.get();
+ return new_processor;
+}
+
+flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) {
+ static const std::string first_processor = "GetFile";
+ flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
+
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
+ // automatically adds it with success
+ auto getFile = plan->addProcessor(first_processor, first_processor);
+
+ plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
+ plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
+ plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
+
+ return new_flow;
+}
+
+processor *add_processor(flow *flow, const char *processor_name) {
+ if (nullptr == flow || nullptr == processor_name) {
+ return nullptr;
+ }
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ auto proc = plan->addProcessor(processor_name, processor_name);
+ if (proc) {
+ processor *new_processor = (processor*) malloc(sizeof(processor));
+ new_processor->processor_ptr = proc.get();
+ return new_processor;
+ }
+ return nullptr;
+}
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
+ if (proc) {
+ processor *new_processor = (processor*) malloc(sizeof(processor));
+ new_processor->processor_ptr = proc.get();
+ return new_processor;
+ }
+ return nullptr;
+}
+
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
+ ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+ return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+}
+
+int set_failure_strategy(flow *flow, FailureStrategy strategy) {
+ return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
+}
+
+int set_property(processor *proc, const char *name, const char *value) {
+ if (name != nullptr && value != nullptr && proc != nullptr) {
+ core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
+ bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
+ return success ? 0 : -2;
+ }
+ return -1;
+}
+
+int free_flow(flow *flow) {
+ if (flow == nullptr || nullptr == flow->plan)
+ return -1;
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ delete execution_plan;
+ free(flow);
+ return 0;
+}
+
+flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
+ if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
+ return nullptr;
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ execution_plan->reset();
+ while (execution_plan->runNextProcessor()) {
+ }
+ auto ff = execution_plan->getCurrentFlowFile();
+ if (ff == nullptr) {
+ return nullptr;
+ }
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr) {
+ // create a flow file.
+ claim->increaseFlowFileRecordOwnedCount();
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->ffp = ff.get();
+ ffr->attributes = ff->getAttributesPtr();
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = execution_plan->getContentRepo();
+ return ffr;
+ } else {
+ return nullptr;
+ }
+}
+
+size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
+ if (nullptr == instance || nullptr == flow || nullptr == ff_r)
+ return 0;
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ size_t i = 0;
+ for (; i < size; i++) {
+ execution_plan->reset();
+ auto ffr = get_next_flow_file(instance, flow);
+ if (ffr == nullptr) {
+ break;
+ }
+ ff_r[i] = ffr;
+ }
+ return i;
+}
+
+flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) {
+ if (nullptr == instance || nullptr == flow || nullptr == session)
+ return nullptr;
+ auto sesh = static_cast<core::ProcessSession*>(session->session);
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ auto ff = sesh->get();
+ execution_plan->setNextFlowFile(ff);
+ if (ff == nullptr) {
+ return nullptr;
+ }
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr) {
+ // create a flow file.
+ claim->increaseFlowFileRecordOwnedCount();
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->attributes = ff->getAttributesPtr();
+ ffr->ffp = ff.get();
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = execution_plan->getContentRepo();
+ return ffr;
+ } else {
+ return nullptr;
+ }
+}
+
+int transfer(processor_session* session, flow *flow, const char *rel) {
+ if (nullptr == session || nullptr == flow || rel == nullptr) {
+ return -1;
+ }
+ auto sesh = static_cast<core::ProcessSession*>(session->session);
+ auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+ if (nullptr == sesh || nullptr == execution_plan) {
+ return -1;
+ }
+ core::Relationship relationship(rel, rel);
+ auto ff = execution_plan->getNextFlowFile();
+ if (nullptr == ff) {
+ return -2;
+ }
+ sesh->transfer(ff, relationship);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp
new file mode 100644
index 0000000..3a2b0d1
--- /dev/null
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/C2CallbackAgent.h"
+#include <csignal>
+#include <utility>
+#include <vector>
+#include <map>
+#include <string>
+#include <memory>
+#include "c2/ControllerSocketProtocol.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/FileManager.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+ const std::shared_ptr<Configure> &configuration)
+ : C2Agent(controller, updateSink, configuration),
+ stop(nullptr),
+ logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
+}
+
+void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) {
+ switch (resp.op) {
+ case Operation::CLEAR:
+ break;
+ case Operation::UPDATE:
+ break;
+ case Operation::DESCRIBE:
+ break;
+ case Operation::RESTART:
+ break;
+ case Operation::START:
+ case Operation::STOP: {
+ if (resp.name == "C2" || resp.name == "c2") {
+ raise(SIGTERM);
+ }
+
+ auto str = resp.name.c_str();
+
+ if (nullptr != stop)
+ stop(const_cast<char*>(str));
+
+ break;
+ }
+ //
+ break;
+ default:
+ break;
+ // do nothing
+ }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/CallbackProcessor.cpp b/nanofi/src/cxx/CallbackProcessor.cpp
new file mode 100644
index 0000000..5294a1b
--- /dev/null
+++ b/nanofi/src/cxx/CallbackProcessor.cpp
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "cxx/CallbackProcessor.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ if (callback_ != nullptr) {
+ processor_session sesh;
+ sesh.session = session;
+ callback_(&sesh);
+ }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/Plan.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
new file mode 100644
index 0000000..f892aa9
--- /dev/null
+++ b/nanofi/src/cxx/Plan.cpp
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/Plan.h"
+#include "cxx/CallbackProcessor.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+bool intToFailureStragey(int in, FailureStrategy *out) {
+ auto tmp = static_cast<FailureStrategy>(in);
+ switch (tmp) {
+ case AS_IS:
+ case ROLLBACK:
+ *out = tmp;
+ return true;
+ default:
+ return false;
+ }
+}
+
+std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+ : content_repo_(content_repo),
+ flow_repo_(flow_repo),
+ prov_repo_(prov_repo),
+ finalized(false),
+ location(-1),
+ current_flowfile_(nullptr),
+ logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
+ stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+}
+
+/**
+ * Add a callback to obtain and pass processor session to a generated processor
+ *
+ */
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
+ if (finalized) {
+ return nullptr;
+ }
+
+ auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
+ if (!ptr)
+ return nullptr;
+
+ std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
+ processor->setCallback(obj, fp);
+
+ return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
+}
+
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+ uint32_t i = 0;
+ logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
+ for (i = 0; i < processor_queue_.size(); i++) {
+ if (processor_queue_.at(i) == proc) {
+ break;
+ }
+ }
+
+ if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
+ return false;
+ }
+
+ return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+
+ utils::Identifier uuid;
+ id_generator_->generate(uuid);
+
+ processor->setStreamFactory(stream_factory);
+ // initialize the processor
+ processor->initialize();
+
+ processor_mapping_[processor->getUUIDStr()] = processor;
+
+ if (!linkToPrevious) {
+ termination_ = relationship;
+ } else {
+ std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+ if (last == nullptr) {
+ last = processor;
+ termination_ = relationship;
+ }
+
+ relationships_.push_back(connectProcessors(last, processor, relationship, true));
+ }
+
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+ processor_nodes_.push_back(node);
+
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ processor_contexts_.push_back(context);
+
+ processor_queue_.push_back(processor);
+
+ return processor;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+ auto processor = ExecutionPlan::createProcessor(processor_name, name);
+ if (!processor) {
+ return nullptr;
+ }
+ return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+void ExecutionPlan::reset() {
+ process_sessions_.clear();
+ factories_.clear();
+ location = -1;
+ for (auto proc : processor_queue_) {
+ while (proc->getActiveTasks() > 0) {
+ proc->decrementActiveTask();
+ }
+ }
+}
+
+bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+ if (!finalized) {
+ finalize();
+ }
+ location++;
+ if (location >= processor_queue_.size()) {
+ return false;
+ }
+ std::shared_ptr<core::Processor> processor = processor_queue_[location];
+ std::shared_ptr<core::ProcessContext> context = processor_contexts_[location];
+ std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+ factories_.push_back(factory);
+ if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+ processor->onSchedule(context, factory);
+ configured_processors_.push_back(processor);
+ }
+ std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+ process_sessions_.push_back(current_session);
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ if (verify != nullptr) {
+ verify(context, current_session);
+ } else {
+ logger_->log_debug("Running %s", processor->getName());
+ processor->onTrigger(context, current_session);
+ }
+ current_session->commit();
+ current_flowfile_ = current_session->get();
+ auto hasMore = location + 1 < processor_queue_.size();
+ if (!hasMore && !current_flowfile_) {
+ std::set<std::shared_ptr<core::FlowFile>> expired;
+ current_flowfile_ = relationships_.back()->poll(expired);
+ }
+ return hasMore;
+}
+
+std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
+ return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
+ return current_flowfile_;
+}
+
+std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
+ return current_session_;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
+ return connectProcessors(processor, processor, termination_, set_dst);
+}
+
+void ExecutionPlan::finalize() {
+ if (failure_handler_) {
+ auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
+
+ std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
+ callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
+
+ for (const auto& proc : processor_queue_) {
+ for (const auto& rel : proc->getSupportedRelationships()) {
+ if (rel.getName() == "failure") {
+ relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
+ break;
+ }
+ }
+ }
+
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
+
+ processor_nodes_.push_back(node);
+
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ processor_contexts_.push_back(context);
+
+ processor_queue_.push_back(failure_proc);
+ }
+
+ if (relationships_.size() > 0) {
+ relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+ } else {
+ for (auto processor : processor_queue_) {
+ relationships_.push_back(buildFinalConnection(processor, true));
+ }
+ }
+
+ finalized = true;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
+ utils::Identifier uuid;
+ id_generator_->generate(uuid);
+
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+ if (nullptr == ptr) {
+ return nullptr;
+ }
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+ processor->setName(name);
+ return processor;
+}
+
+std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, core::Relationship relationship,
+ bool set_dst) {
+ std::stringstream connection_name;
+ connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(relationship);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(src_proc);
+
+ utils::Identifier uuid_copy, uuid_copy_next;
+ src_proc->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ if (set_dst) {
+ connection->setDestination(dst_proc);
+ dst_proc->getUUID(uuid_copy_next);
+ connection->setDestinationUUID(uuid_copy_next);
+ if (src_proc != dst_proc) {
+ dst_proc->addConnection(connection);
+ }
+ }
+ src_proc->addConnection(connection);
+
+ return connection;
+}
+
+bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
+ if (finalized && !failure_handler_) {
+ return false; // Already finalized the flow without failure handler processor
+ }
+ if (!failure_handler_) {
+ failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
+ }
+ failure_handler_->setCallback(onerror_callback);
+ return true;
+}
+
+bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
+ if (!failure_handler_) {
+ return false;
+ }
+ failure_handler_->setStrategy(start);
+ return true;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/tests/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
new file mode 100644
index 0000000..65c52e1
--- /dev/null
+++ b/nanofi/tests/CAPITests.cpp
@@ -0,0 +1,278 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <sys/stat.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "utils/file/FileUtils.h"
+#include "TestBase.h"
+
+#include <chrono>
+#include <thread>
+#include "api/nanofi.h"
+
+static nifi_instance *create_instance_obj(const char *name = "random_instance") {
+ nifi_port port;
+ char port_str[] = "12345";
+ port.port_id = port_str;
+ return create_instance("random_instance", &port);
+}
+
+static int failure_count = 0;
+
+void failure_counter(flow_file_record * fr) {
+ failure_count++;
+ REQUIRE(get_attribute_qty(fr) > 0);
+ free_flowfile(fr);
+}
+
+void big_failure_counter(flow_file_record * fr) {
+ failure_count += 100;
+ free_flowfile(fr);
+}
+
+TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+ processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+ REQUIRE(test_proc != nullptr);
+ free_flow(test_flow);
+ free_instance(instance);
+}
+
+TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ processor *test_proc = add_processor(test_flow, "NeverExisted");
+ REQUIRE(test_proc == nullptr);
+ processor *no_proc = add_processor(test_flow, "");
+ REQUIRE(no_proc == nullptr);
+ free_flow(test_flow);
+ free_instance(instance);
+}
+
+TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+ processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+ REQUIRE(test_proc != nullptr);
+ REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value
+ // TODO(aboda): add this two below when property handling is made strictly typed
+ // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
+ // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
+ REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value
+ REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute
+ REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor
+ free_flow(test_flow);
+ free_instance(instance);
+}
+
+TEST_CASE("get file and put file", "[getAndPutFile]") {
+ TestController testController;
+
+ char src_format[] = "/tmp/gt.XXXXXX";
+ char put_format[] = "/tmp/pt.XXXXXX";
+ const char *sourcedir = testController.createTempDirectory(src_format);
+ const char *putfiledir = testController.createTempDirectory(put_format);
+ std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+ processor *get_proc = add_processor(test_flow, "GetFile");
+ REQUIRE(get_proc != nullptr);
+ processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+ REQUIRE(put_proc != nullptr);
+ REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+ REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << sourcedir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+
+ flow_file_record *record = get_next_flow_file(instance, test_flow);
+ REQUIRE(record != nullptr);
+
+ ss.str("");
+
+ ss << putfiledir << "/" << "tstFile.ext";
+ std::ifstream t(ss.str());
+ std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
+
+ REQUIRE(test_file_content == put_data);
+
+ // No failure handler can be added after the flow is finalized
+ REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
+
+ free_flowfile(record);
+
+ free_flow(test_flow);
+
+ free_instance(instance);
+}
+
+TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
+ TestController testController;
+
+ char src_format[] = "/tmp/gt.XXXXXX";
+ const char *sourcedir = testController.createTempDirectory(src_format);
+
+ std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << sourcedir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+
+ processor *get_proc = add_processor(test_flow, "GetFile");
+ REQUIRE(get_proc != nullptr);
+ REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+ processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
+ REQUIRE(extract_test != nullptr);
+ REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
+ processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
+ REQUIRE(update_attr != nullptr);
+
+ REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
+
+ flow_file_record *record = get_next_flow_file(instance, test_flow);
+
+ REQUIRE(record != nullptr);
+
+ attribute test_attr;
+ test_attr.key = "TestAttr";
+ REQUIRE(get_attribute(record, &test_attr) == 0);
+
+ REQUIRE(test_attr.value_size != 0);
+ REQUIRE(test_attr.value != nullptr);
+
+ std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
+
+ REQUIRE(attr_value == test_file_content);
+
+ const char * new_testattr_value = "S0me t3st t3xt";
+
+ // Attribute already exist, should fail
+ REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT
+
+ // Update overwrites values
+ update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT
+
+ int attr_size = get_attribute_qty(record);
+ REQUIRE(attr_size > 0);
+
+ attribute_set attr_set;
+ attr_set.size = attr_size;
+ attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT
+
+ REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
+
+ bool test_attr_found = false;
+ bool updated_attr_found = false;
+ for (int i = 0; i < attr_set.size; ++i) {
+ if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
+ test_attr_found = true;
+ REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
+ } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
+ updated_attr_found = true;
+ REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
+ }
+ }
+ REQUIRE(updated_attr_found == true);
+ REQUIRE(test_attr_found == true);
+
+ free_flowfile(record);
+
+ free_flow(test_flow);
+ free_instance(instance);
+}
+
+TEST_CASE("Test error handling callback", "[errorHandling]") {
+ TestController testController;
+
+ char src_format[] = "/tmp/gt.XXXXXX";
+ const char *sourcedir = testController.createTempDirectory(src_format);
+ std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+ auto instance = create_instance_obj();
+ REQUIRE(instance != nullptr);
+ flow *test_flow = create_flow(instance, nullptr);
+ REQUIRE(test_flow != nullptr);
+
+ // Failure strategy cannot be set before a valid callback is added
+ REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
+ REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
+
+ processor *get_proc = add_processor(test_flow, "GetFile");
+ REQUIRE(get_proc != nullptr);
+ processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+ REQUIRE(put_proc != nullptr);
+ REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+ REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
+ REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+
+ ss << sourcedir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+
+
+ REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+
+ REQUIRE(failure_count == 1);
+
+ // Failure handler function can be replaced runtime
+ REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+ REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
+
+ // Create new testfile to trigger failure again
+ ss << "2";
+ file.open(ss.str(), std::ios::out);
+ file << test_file_content;
+ file.close();
+
+ REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+ REQUIRE(failure_count > 100);
+
+ failure_count = 0;
+
+ free_flow(test_flow);
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/library/CMakeLists.txt b/python/library/CMakeLists.txt
index 5d309a1..684cf22 100644
--- a/python/library/CMakeLists.txt
+++ b/python/library/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
CMAKE_POLICY(SET CMP0048 OLD)
ENDIF(POLICY CMP0048)
-include_directories(../../blocks/ ../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
+include_directories(../../nanofi/include/ ../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
if(WIN32)
include_directories(../../libminifi/opsys/win)
else()
@@ -33,9 +33,9 @@ include_directories(../../extensions/http-curl/ ../../extensions/http-curl/clien
add_library(python-lib SHARED python_lib.cpp)
if (APPLE)
- target_link_libraries(python-lib capi core-minifi minifi)
+ target_link_libraries(python-lib nanofi core-minifi minifi)
else()
- target_link_libraries(python-lib capi-shared core-minifi-shared minifi-shared)
+ target_link_libraries(python-lib nanofi-shared core-minifi-shared minifi-shared)
endif(APPLE)
if (WIN32)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/python_lib.cpp
----------------------------------------------------------------------
diff --git a/python/library/python_lib.cpp b/python/library/python_lib.cpp
index eb81f79..79241df 100644
--- a/python/library/python_lib.cpp
+++ b/python/library/python_lib.cpp
@@ -22,11 +22,10 @@
#include <sys/stat.h>
#include <unistd.h>
#include <dirent.h>
-#include "capi/api.h"
-#include "file_blocks.h"
-#include "comms.h"
-#include "capi/api.h"
-#include "capi/processors.h"
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
#include "HTTPCurlLoader.h"
#include "python_lib.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/rocksdb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/CMakeLists.txt b/thirdparty/rocksdb/CMakeLists.txt
index 837f0bc..f20d09a 100644
--- a/thirdparty/rocksdb/CMakeLists.txt
+++ b/thirdparty/rocksdb/CMakeLists.txt
@@ -584,6 +584,7 @@ else()
endif()
set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX})
+# commented out to avoid building the shared lib
#set(ROCKSDB_SHARED_LIB rocksdb-shared${ARTIFACT_SUFFIX})
set(ROCKSDB_IMPORT_LIB ${ROCKSDB_SHARED_LIB})
if(WIN32)
@@ -592,16 +593,19 @@ if(WIN32)
set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
else()
set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT})
- set(LIBS ${ROCKSDB_SHARED_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
- add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
- # target_link_libraries(${ROCKSDB_SHARED_LIB}
+ set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
+# commented out to avoid building the shared lib
+# as there is no reason
+#add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
+
+# target_link_libraries(${ROCKSDB_SHARED_LIB}
# ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
- set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
- LINKER_LANGUAGE CXX
- VERSION ${ROCKSDB_VERSION}
- SOVERSION ${ROCKSDB_VERSION_MAJOR}
- CXX_STANDARD 11
- OUTPUT_NAME "rocksdb")
+# set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
+# LINKER_LANGUAGE CXX
+# VERSION ${ROCKSDB_VERSION}
+# SOVERSION ${ROCKSDB_VERSION_MAJOR}
+# CXX_STANDARD 11
+# OUTPUT_NAME "rocksdb")
endif()
option(WITH_LIBRADOS "Build with librados" OFF)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
index f4239a4..6716638 100644
--- a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
+++ b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
@@ -44,7 +44,7 @@ option(YAML_CPP_BUILD_CONTRIB "Enable contrib stuff in library" ON)
# --> General
# see http://www.cmake.org/cmake/help/cmake2.6docs.html#variable:BUILD_SHARED_LIBS
# http://www.cmake.org/cmake/help/cmake2.6docs.html#command:add_library
-option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
+#option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
[2/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into
nanofi
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
deleted file mode 100644
index e135fe1..0000000
--- a/libminifi/src/capi/api.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <string>
-#include <map>
-#include <memory>
-#include <utility>
-#include <exception>
-#include "core/Core.h"
-#include "capi/api.h"
-#include "capi/expect.h"
-#include "capi/Instance.h"
-#include "capi/Plan.h"
-#include "ResourceClaim.h"
-#include "processors/GetFile.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/StringUtils.h"
-
-using string_map = std::map<std::string, std::string>;
-
-class API_INITIALIZER {
- public:
- static int initialized;
-};
-
-int API_INITIALIZER::initialized = initialize_api();
-
-int initialize_api() {
- logging::LoggerConfiguration::getConfiguration().disableLogging();
- return 1;
-}
-
-void enable_logging() {
- logging::LoggerConfiguration::getConfiguration().enableLogging();
-}
-
-void set_terminate_callback(void (*terminate_callback)()) {
- std::set_terminate(terminate_callback);
-}
-
-class DirectoryConfiguration {
- protected:
- DirectoryConfiguration() {
- minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
- }
- public:
- static void initialize() {
- static DirectoryConfiguration configure;
- }
-};
-
-/**
- * Creates a NiFi Instance from the url and output port.
- * @param url http URL for NiFi instance
- * @param port Remote output port.
- * @Deprecated for API version 0.2 in favor of the following prototype
- * nifi_instance *create_instance(nifi_port const *port) {
- */
-nifi_instance *create_instance(const char *url, nifi_port *port) {
- // make sure that we have a thread safe way of initializing the content directory
- DirectoryConfiguration::initialize();
-
- // need reinterpret cast until we move to C for this module.
- nifi_instance *instance = reinterpret_cast<nifi_instance*>( malloc(sizeof(nifi_instance)) );
- /**
- * This API will gradually move away from C++, hence malloc is used for nifi_instance
- * Since minifi::Instance is currently being used, then we need to use new in that case.
- */
- instance->instance_ptr = new minifi::Instance(url, port->port_id);
- // may have to translate port ID here in the future
- // need reinterpret cast until we move to C for this module.
- instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
- snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id);
- return instance;
-}
-
-/**
- * Initializes the instance
- */
-void initialize_instance(nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->setRemotePort(instance->port.port_id);
-}
-/*
- typedef int c2_update_callback(char *);
-
- typedef int c2_stop_callback(char *);
-
- typedef int c2_start_callback(char *);
-
- */
-void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
-}
-
-/**
- * Sets a property within the nifi instance
- * @param instance nifi instance
- * @param key key in which we will set the valiue
- * @param value
- * @return -1 when instance or key are null
- */
-int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
- if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
- return -1;
- }
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- minifi_instance_ref->getConfiguration()->set(key, value);
- return 0;
-}
-
-/**
- * Reclaims memory associated with a nifi instance structure.
- * @param instance nifi instance.
- */
-void free_instance(nifi_instance* instance) {
- if (instance != nullptr) {
- delete ((minifi::Instance*) instance->instance_ptr);
- free(instance->port.port_id);
- free(instance);
- }
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_flowfile(const char *file, const size_t len) {
- flow_file_record *new_ff = new flow_file_record;
- new_ff->attributes = new string_map();
- new_ff->contentLocation = new char[len + 1];
- snprintf(new_ff->contentLocation, len + 1, "%s", file);
- std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
- // set the size of the flow file.
- new_ff->size = in.tellg();
- return new_ff;
-}
-
-/**
- * Creates a flow file record
- * @param file file to place into the flow file.
- */
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
- if (nullptr == file) {
- return nullptr;
- }
- flow_file_record *new_ff = create_ff_object_na(file, len, size);
- new_ff->attributes = new string_map();
- new_ff->ffp = 0;
- return new_ff;
-}
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
- flow_file_record *new_ff = new flow_file_record;
- new_ff->attributes = nullptr;
- new_ff->contentLocation = new char[len + 1];
- snprintf(new_ff->contentLocation, len + 1, "%s", file);
- // set the size of the flow file.
- new_ff->size = size;
- new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
- return new_ff;
-}
-/**
- * Reclaims memory associated with a flow file object
- * @param ff flow file record.
- */
-void free_flowfile(flow_file_record *ff) {
- if (ff == nullptr) {
- return;
- }
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
- if (content_repo_ptr->get()) {
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
- (*content_repo_ptr)->remove(claim);
- }
- if (ff->ffp == nullptr) {
- auto map = static_cast<string_map*>(ff->attributes);
- delete map;
- }
- delete[] ff->contentLocation;
- delete ff;
- delete content_repo_ptr;
-}
-
-/**
- * Adds an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
- */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
- return ret.second ? 0 : -1;
-}
-
-/**
- * Updates (or adds) an attribute
- * @param ff flow file record
- * @param key key
- * @param value value to add
- * @param size size of value
- */
-void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
-}
-
-/*
- * Obtains the attribute.
- * @param ff flow file record
- * @param key key
- * @param caller_attribute caller supplied object in which we will copy the data ptr
- * @return 0 if successful, -1 if the key does not exist
- */
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) {
- if (ff == nullptr) {
- return -1;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- if (!attribute_map) {
- return -1;
- }
- auto find = attribute_map->find(caller_attribute->key);
- if (find != attribute_map->end()) {
- caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
- caller_attribute->value_size = find->second.size();
- return 0;
- }
- return -1;
-}
-
-int get_attribute_qty(const flow_file_record* ff) {
- if (ff == nullptr) {
- return 0;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- return attribute_map ? attribute_map->size() : 0;
-}
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
- if (ff == nullptr) {
- return 0;
- }
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- if (!attribute_map || attribute_map->empty()) {
- return 0;
- }
- int i = 0;
- for (const auto& kv : *attribute_map) {
- if (i >= target->size) {
- break;
- }
- target->attributes[i].key = kv.first.data();
- target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
- target->attributes[i].value_size = kv.second.size();
- ++i;
- }
- return i;
-}
-
-/**
- * Removes a key from the attribute chain
- * @param ff flow file record
- * @param key key to remove
- * @return 0 if removed, -1 otherwise
- */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
- auto attribute_map = static_cast<string_map*>(ff->attributes);
- return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1)
-}
-
-/**
- * Transmits the flowfile
- * @param ff flow file record
- * @param instance nifi instance structure
- */
-int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
- if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
- minifi_instance_ref->setRemotePort(instance->port.port_id);
- }
-
- auto attribute_map = static_cast<string_map*>(ff->attributes);
-
- auto no_op = minifi_instance_ref->getNoOpRepository();
-
- auto content_repo = minifi_instance_ref->getContentRepository();
-
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
- claim->increaseFlowFileRecordOwnedCount();
- claim->increaseFlowFileRecordOwnedCount();
-
- auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
- ffr->addAttribute("nanofi.version", API_VERSION);
- ffr->setSize(ff->size);
-
- std::string port_uuid = instance->port.port_id;
-
- minifi_instance_ref->transfer(ffr);
-
- return 0;
-}
-
-flow *create_new_flow(nifi_instance *instance) {
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- flow *new_flow = new flow;
-
- auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
- new_flow->plan = execution_plan;
-
- return new_flow;
-}
-
-flow *create_flow(nifi_instance *instance, const char *first_processor) {
- if (nullptr == instance || nullptr == instance->instance_ptr) {
- return nullptr;
- }
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- flow *new_flow = new flow;
-
- auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
-
- new_flow->plan = execution_plan;
-
- if (first_processor != nullptr && strlen(first_processor) > 0) {
- // automatically adds it with success
- execution_plan->addProcessor(first_processor, first_processor);
- }
- return new_flow;
-}
-
-processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
- if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) {
- return nullptr;
- }
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1));
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
-}
-
-flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) {
- static const std::string first_processor = "GetFile";
- flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow;
-
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
- // automatically adds it with success
- auto getFile = plan->addProcessor(first_processor, first_processor);
-
- plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
- plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
- plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
-
- return new_flow;
-}
-
-processor *add_processor(flow *flow, const char *processor_name) {
- if (nullptr == flow || nullptr == processor_name) {
- return nullptr;
- }
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addProcessor(processor_name, processor_name);
- if (proc) {
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
- }
- return nullptr;
-}
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true);
- if (proc) {
- processor *new_processor = new processor();
- new_processor->processor_ptr = proc.get();
- return new_processor;
- }
- return nullptr;
-}
-
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
- ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
- return plan->setFailureCallback(onerror_callback) ? 0 : 1;
-}
-
-int set_failure_strategy(flow *flow, FailureStrategy strategy) {
- return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1;
-}
-
-int set_property(processor *proc, const char *name, const char *value) {
- if (name != nullptr && value != nullptr && proc != nullptr) {
- core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
- bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
- return success ? 0 : -2;
- }
- return -1;
-}
-
-int free_flow(flow *flow) {
- if (flow == nullptr || nullptr == flow->plan)
- return -1;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- delete execution_plan;
- delete flow;
- return 0;
-}
-
-flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
- if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
- return nullptr;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- execution_plan->reset();
- while (execution_plan->runNextProcessor()) {
- }
- auto ff = execution_plan->getCurrentFlowFile();
- if (ff == nullptr) {
- return nullptr;
- }
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr) {
- // create a flow file.
- claim->increaseFlowFileRecordOwnedCount();
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->ffp = ff.get();
- ffr->attributes = ff->getAttributesPtr();
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
- *content_repo_ptr = execution_plan->getContentRepo();
- return ffr;
- } else {
- return nullptr;
- }
-}
-
-size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
- if (nullptr == instance || nullptr == flow || nullptr == ff_r)
- return 0;
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- size_t i = 0;
- for (; i < size; i++) {
- execution_plan->reset();
- auto ffr = get_next_flow_file(instance, flow);
- if (ffr == nullptr) {
- break;
- }
- ff_r[i] = ffr;
- }
- return i;
-}
-
-flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) {
- if (nullptr == instance || nullptr == flow || nullptr == session)
- return nullptr;
- auto sesh = static_cast<core::ProcessSession*>(session->session);
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- auto ff = sesh->get();
- execution_plan->setNextFlowFile(ff);
- if (ff == nullptr) {
- return nullptr;
- }
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr) {
- // create a flow file.
- claim->increaseFlowFileRecordOwnedCount();
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->attributes = ff->getAttributesPtr();
- ffr->ffp = ff.get();
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
- *content_repo_ptr = execution_plan->getContentRepo();
- return ffr;
- } else {
- return nullptr;
- }
-}
-
-int transfer(processor_session* session, flow *flow, const char *rel) {
- if (nullptr == session || nullptr == flow || rel == nullptr) {
- return -1;
- }
- auto sesh = static_cast<core::ProcessSession*>(session->session);
- auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
- if (nullptr == sesh || nullptr == execution_plan) {
- return -1;
- }
- core::Relationship relationship(rel, rel);
- auto ff = execution_plan->getNextFlowFile();
- if (nullptr == ff) {
- return -2;
- }
- sesh->transfer(ff, relationship);
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 47c7ba6..674566b 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -17,7 +17,7 @@
*/
#include "core/repository/VolatileContentRepository.h"
-#include "capi/expect.h"
+#include "core/expect.h"
#include <cstdio>
#include <string>
#include <memory>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/processors/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp
deleted file mode 100644
index 5524d92..0000000
--- a/libminifi/src/processors/CallbackProcessor.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/CallbackProcessor.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- if (callback_ != nullptr) {
- processor_session sesh;
- sesh.session = session;
- callback_(&sesh);
- }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp
deleted file mode 100644
index b7bf784..0000000
--- a/libminifi/test/capi/CAPITests.cpp
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <sys/stat.h>
-#include <utility>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <fstream>
-
-#include "utils/file/FileUtils.h"
-#include "../TestBase.h"
-
-#include "capi/api.h"
-
-#include <chrono>
-#include <thread>
-
-static nifi_instance *create_instance_obj(const char *name = "random_instance") {
- nifi_port port;
- char port_str[] = "12345";
- port.port_id = port_str;
- return create_instance("random_instance", &port);
-}
-
-static int failure_count = 0;
-
-void failure_counter(flow_file_record * fr) {
- failure_count++;
- REQUIRE(get_attribute_qty(fr) > 0);
- free_flowfile(fr);
-}
-
-void big_failure_counter(flow_file_record * fr) {
- failure_count += 100;
- free_flowfile(fr);
-}
-
-TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
- REQUIRE(test_proc != nullptr);
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- processor *test_proc = add_processor(test_flow, "NeverExisted");
- REQUIRE(test_proc == nullptr);
- processor *no_proc = add_processor(test_flow, "");
- REQUIRE(no_proc == nullptr);
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
- REQUIRE(test_proc != nullptr);
- REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value
- // TODO(aboda): add this two below when property handling is made strictly typed
- // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value
- // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute
- REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value
- REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute
- REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("get file and put file", "[getAndPutFile]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- char put_format[] = "/tmp/pt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
- const char *putfiledir = testController.createTempDirectory(put_format);
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
- REQUIRE(put_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
- flow_file_record *record = get_next_flow_file(instance, test_flow);
- REQUIRE(record != nullptr);
-
- ss.str("");
-
- ss << putfiledir << "/" << "tstFile.ext";
- std::ifstream t(ss.str());
- std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
-
- REQUIRE(test_file_content == put_data);
-
- // No failure handler can be added after the flow is finalized
- REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
-
- free_flowfile(record);
-
- free_flow(test_flow);
-
- free_instance(instance);
-}
-
-TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
-
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
- std::fstream file;
- std::stringstream ss;
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
-
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText");
- REQUIRE(extract_test != nullptr);
- REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
- processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute");
- REQUIRE(update_attr != nullptr);
-
- REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
-
- flow_file_record *record = get_next_flow_file(instance, test_flow);
-
- REQUIRE(record != nullptr);
-
- attribute test_attr;
- test_attr.key = "TestAttr";
- REQUIRE(get_attribute(record, &test_attr) == 0);
-
- REQUIRE(test_attr.value_size != 0);
- REQUIRE(test_attr.value != nullptr);
-
- std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size);
-
- REQUIRE(attr_value == test_file_content);
-
- const char * new_testattr_value = "S0me t3st t3xt";
-
- // Attribute already exist, should fail
- REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT
-
- // Update overwrites values
- update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT
-
- int attr_size = get_attribute_qty(record);
- REQUIRE(attr_size > 0);
-
- attribute_set attr_set;
- attr_set.size = attr_size;
- attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT
-
- REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
-
- bool test_attr_found = false;
- bool updated_attr_found = false;
- for (int i = 0; i < attr_set.size; ++i) {
- if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
- test_attr_found = true;
- REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value);
- } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
- updated_attr_found = true;
- REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue");
- }
- }
- REQUIRE(updated_attr_found == true);
- REQUIRE(test_attr_found == true);
-
- free_flowfile(record);
-
- free_flow(test_flow);
- free_instance(instance);
-}
-
-TEST_CASE("Test error handling callback", "[errorHandling]") {
- TestController testController;
-
- char src_format[] = "/tmp/gt.XXXXXX";
- const char *sourcedir = testController.createTempDirectory(src_format);
- std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
-
- auto instance = create_instance_obj();
- REQUIRE(instance != nullptr);
- flow *test_flow = create_flow(instance, nullptr);
- REQUIRE(test_flow != nullptr);
-
- // Failure strategy cannot be set before a valid callback is added
- REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
- REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
-
- processor *get_proc = add_processor(test_flow, "GetFile");
- REQUIRE(get_proc != nullptr);
- processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
- REQUIRE(put_proc != nullptr);
- REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
- REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
- REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
-
- std::fstream file;
- std::stringstream ss;
-
- ss << sourcedir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
-
- REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
-
- REQUIRE(failure_count == 1);
-
- // Failure handler function can be replaced runtime
- REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
- REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
-
- // Create new testfile to trigger failure again
- ss << "2";
- file.open(ss.str(), std::ios::out);
- file << test_file_content;
- file.close();
-
- REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
- REQUIRE(failure_count > 100);
-
- failure_count = 0;
-
- free_flow(test_flow);
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
new file mode 100644
index 0000000..bd2d952
--- /dev/null
+++ b/nanofi/CMakeLists.txt
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(include)
+include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+
+if(WIN32)
+include_directories(../libminifi/opsys/win)
+else()
+include_directories(../libminifi/opsys/posix)
+endif()
+
+file(GLOB NANOFI_SOURCES "src/api/*.cpp" "src/cxx/*.cpp" )
+
+file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" )
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+ if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+ CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+ if (_cpp_latest_flag_supported)
+ add_compile_options("/std:c++14")
+ endif()
+ endif()
+else()
+
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+add_library(nanofi STATIC ${NANOFI_SOURCES})
+
+if (APPLE)
+ target_link_libraries (nanofi -Wl,-all_load core-minifi minifi)
+elseif(NOT WIN32)
+ target_link_libraries (nanofi -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive)
+else()
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi")
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi")
+endif ()
+
+if(WIN32)
+ set_target_properties(nanofi PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+if (ENABLE_PYTHON)
+
+add_library(nanofi-shared SHARED ${NANOFI_SOURCES})
+
+if (APPLE)
+ target_link_libraries (nanofi-shared -Wl,-all_load core-minifi-shared minifi-shared)
+elseif(NOT WIN32)
+ target_link_libraries (nanofi-shared -Wl,--whole-archive core-minifi-shared minifi-shared -Wl,--no-whole-archive)
+else()
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi-shared")
+ set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi-shared")
+endif ()
+
+
+if(WIN32)
+ set_target_properties(nanofi-shared PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}")
+endif()
+
+set_property(TARGET nanofi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+endif(ENABLE_PYTHON)
+
+if (NOT DISABLE_CURL)
+add_subdirectory(examples)
+endif()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
new file mode 100644
index 0000000..f5f660f
--- /dev/null
+++ b/nanofi/examples/CMakeLists.txt
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+ if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+ CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+ if (_cpp_latest_flag_supported)
+ add_compile_options("/std:c++14")
+ endif()
+ endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (WIN32)
+ set(LINK_FLAGS "/WHOLEARCHIVE")
+ set(LINK_END_FLAGS "")
+elseif (APPLE)
+ set(LINK_FLAGS "-Wl,-all_load")
+ set(LINK_END_FLAGS "")
+else ()
+ set(LINK_FLAGS "-Wl,--whole-archive")
+ set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
+add_executable(generate_flow generate_flow.c)
+
+add_executable(terminate_handler terminate_handler.c)
+
+target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
+if (NOT WIN32)
+
+add_executable(transmit_flow transmit_flow.c)
+
+target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(monitor_directory monitor_directory.c)
+
+target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/generate_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/generate_flow.c b/nanofi/examples/generate_flow.c
new file mode 100644
index 0000000..707de11
--- /dev/null
+++ b/nanofi/examples/generate_flow.c
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 3) {
+ printf("Error: must run ./generate_flow <instance> <remote port> \n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+ flow_file_record *record = get_next_flow_file(instance, new_flow);
+
+ if (record == 0) {
+ printf("Could not create flow file");
+ exit(1);
+ }
+
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ //transfer_file_or_directory(instance,file);
+
+ free_flow(new_flow);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/monitor_directory.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/monitor_directory.c b/nanofi/examples/monitor_directory.c
new file mode 100644
index 0000000..2283b35
--- /dev/null
+++ b/nanofi/examples/monitor_directory.c
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <pthread.h>
+
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
+int is_dir(const char *path) {
+ struct stat stat_struct;
+ if (stat(path, &stat_struct) != 0)
+ return 0;
+ return S_ISDIR(stat_struct.st_mode);
+}
+
+pthread_mutex_t mutex;
+pthread_cond_t condition;
+
+int stopped;
+
+int stop_callback(char *c) {
+ pthread_mutex_lock(&mutex);
+ stopped = 1;
+ pthread_cond_signal(&condition);
+ pthread_mutex_unlock(&mutex);
+ return 0;
+}
+
+int is_stopped(void *ptr) {
+ int is_stop = 0;
+ pthread_mutex_lock(&mutex);
+ is_stop = stopped;
+ pthread_mutex_unlock(&mutex);
+ return is_stop;
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+ if (argc < 5) {
+ printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
+ exit(1);
+ }
+
+ stopped = 0x00;
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+ char *directory = argv[3];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ C2_Server server;
+ server.url = argv[4];
+ server.ack_url = argv[5];
+ server.identifier = "monitor_directory";
+ server.type = REST;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
+
+ flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
+
+ transmit_to_nifi(instance, new_flow, is_stopped);
+
+ free_flow(new_flow);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/terminate_handler.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c
new file mode 100644
index 0000000..d5443f0
--- /dev/null
+++ b/nanofi/examples/terminate_handler.c
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "api/nanofi.h"
+
+/**
+ * This is an example of the C API that registers terminate handler and generates an exception.
+ */
+
+void example_terminate_handler() {
+ fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
+ exit(0);
+}
+
+int main(int argc, char **argv) {
+
+ nifi_port port;
+
+ port.port_id = "12345";
+
+ set_terminate_callback(example_terminate_handler);
+
+ nifi_instance *instance = create_instance("random instance", &port);
+
+ flow *new_flow = create_flow(instance, "GenerateFlowFile");
+
+ processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
+
+ // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
+ // As it doesn't exist, an exception is thrown
+ set_property(put_proc, "Directory", "/tmp/never_existed");
+ set_property(put_proc, "Create Missing Directories", "false");
+
+ flow_file_record *record = get_next_flow_file(instance, new_flow );
+
+ // Here be dragons - nothing below this line gets executed
+ fprintf(stderr, "Dragons!!!");
+ free_flowfile(record);
+ free_flow(new_flow);
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/transmit_flow.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
new file mode 100644
index 0000000..e132c8b
--- /dev/null
+++ b/nanofi/examples/transmit_flow.c
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "api/nanofi.h"
+
+int is_dir(const char *path) {
+ struct stat stat_struct;
+ if (stat(path, &stat_struct) != 0)
+ return 0;
+ return S_ISDIR(stat_struct.st_mode);
+}
+
+void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
+ int size = 1;
+
+ if (is_dir(file_or_dir)) {
+ DIR *d;
+
+ struct dirent *dir;
+ d = opendir(file_or_dir);
+ if (d) {
+ while ((dir = readdir(d)) != NULL) {
+ if (!memcmp(dir->d_name,".",1) )
+ continue;
+ char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
+ sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
+ transfer_file_or_directory(instance,file_path);
+ free(file_path);
+ }
+ closedir(d);
+ }
+ printf("%s is a directory", file_or_dir);
+ } else {
+ printf("Transferring %s\n",file_or_dir);
+
+ flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
+
+ add_attribute(record, "addedattribute", "1", 2);
+
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+ }
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote instance.
+ */
+int main(int argc, char **argv) {
+
+ if (argc < 4) {
+ printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
+ exit(1);
+ }
+
+ char *instance_str = argv[1];
+ char *portStr = argv[2];
+ char *file = argv[3];
+
+ nifi_port port;
+
+ port.port_id = portStr;
+
+ nifi_instance *instance = create_instance(instance_str, &port);
+
+ // initializing will make the transmission slightly more efficient.
+ //initialize_instance(instance);
+ transfer_file_or_directory(instance,file);
+
+ free_instance(instance);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/api/nanofi.h
----------------------------------------------------------------------
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
new file mode 100644
index 0000000..31a4829
--- /dev/null
+++ b/nanofi/include/api/nanofi.h
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include "core/cstructs.h"
+#include "core/processors.h"
+
+int initialize_api();
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Updates with every release. Functions used here constitute the public API of NanoFi.
+ *
+ * Changes here will follow semver
+ */
+#define API_VERSION "0.02"
+
+void enable_logging();
+
+void set_terminate_callback(void (*terminate_callback)());
+
+/****
+ * ##################################################################
+ * BASE NIFI OPERATIONS
+ * ##################################################################
+ */
+
+nifi_instance *create_instance(const char *url, nifi_port *port);
+
+void initialize_instance(nifi_instance *);
+
+void free_instance(nifi_instance*);
+
+/****
+ * ##################################################################
+ * C2 OPERATIONS
+ * ##################################################################
+ */
+
+
+typedef int c2_update_callback(char *);
+
+typedef int c2_stop_callback(char *);
+
+typedef int c2_start_callback(char *);
+
+void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
+
+
+uint8_t run_processor(const processor *processor);
+
+flow *create_new_flow(nifi_instance *);
+
+flow *create_flow(nifi_instance *, const char *);
+
+flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+
+processor *add_processor(flow *, const char *);
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name);
+
+processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+
+/**
+* Register your callback to received flow files that the flow failed to process
+* The flow file ownership is transferred to the caller!
+* The first callback should be registered before the flow is used. Can be changed later during runtime.
+*/
+int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
+
+
+/**
+* Set failure strategy. Please use the enum defined in cstructs.h
+* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
+* Can be changed runtime.
+* The defailt strategy is AS IS.
+*/
+int set_failure_strategy(flow *flow, FailureStrategy strategy);
+
+int set_property(processor *, const char *, const char *);
+
+int set_instance_property(nifi_instance *instance, const char*, const char *);
+
+int free_flow(flow *);
+
+flow_file_record *get_next_flow_file(nifi_instance *, flow *);
+
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
+
+flow_file_record *get(nifi_instance *,flow *, processor_session *);
+
+int transfer(processor_session* session, flow *flow, const char *rel);
+
+/**
+ * Creates a flow file object.
+ * Will obtain the size of file
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len);
+
+flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
+
+void free_flowfile(flow_file_record*);
+
+uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
+
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
+
+int get_attribute_qty(const flow_file_record* ff);
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target);
+
+uint8_t remove_attribute(flow_file_record*, char *key);
+
+/****
+ * ##################################################################
+ * Remote NIFI OPERATIONS
+ * ##################################################################
+ */
+
+int transmit_flowfile(flow_file_record *, nifi_instance *);
+
+/****
+ * ##################################################################
+ * Persistence Operations
+ * ##################################################################
+ */
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/comms.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/comms.h b/nanofi/include/blocks/comms.h
new file mode 100644
index 0000000..bfacc3d
--- /dev/null
+++ b/nanofi/include/blocks/comms.h
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_COMMS_H_
+#define BLOCKS_COMMS_H_
+
+#include <stdio.h>
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define SUCCESS 0x00
+#define FINISHED_EARLY 0x01
+#define FAIL 0x02
+
+typedef int transmission_stop(void *);
+
+uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
+
+ flow_file_record *record = 0x00;
+ do {
+ record = get_next_flow_file(instance, flow);
+
+ if (record == 0) {
+ return FINISHED_EARLY;
+ }
+ transmit_flowfile(record, instance);
+
+ free_flowfile(record);
+ } while (record != 0x00 && !(stop_callback != 0x00 && stop_callback(0x00)));
+ return SUCCESS;
+}
+
+#endif /* BLOCKS_COMMS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/nanofi/include/blocks/file_blocks.h b/nanofi/include/blocks/file_blocks.h
new file mode 100644
index 0000000..d84ccbe
--- /dev/null
+++ b/nanofi/include/blocks/file_blocks.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_FILE_BLOCKS_H_
+#define BLOCKS_FILE_BLOCKS_H_
+
+#include "../api/nanofi.h"
+#include "core/processors.h"
+
+#define KEEP_SOURCE 0x01
+#define RECURSE 0x02
+
+/**
+ * Monitor directory can be combined into a current flow. to create an execution plan
+ */
+flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
+ GetFileConfig config;
+ config.directory = directory;
+ config.keep_source = flags & KEEP_SOURCE;
+ config.recurse = flags & RECURSE;
+ return create_getfile(instance, parent_flow, &config);
+}
+
+#endif /* BLOCKS_FILE_BLOCKS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/cstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
new file mode 100644
index 0000000..bc9700e
--- /dev/null
+++ b/nanofi/include/core/cstructs.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
+
+/**
+ * NiFi Port struct
+ */
+typedef struct {
+ char *port_id;
+} nifi_port;
+
+/**
+ * Nifi instance struct
+ */
+typedef struct {
+
+ void *instance_ptr;
+
+ nifi_port port;
+
+} nifi_instance;
+
+/****
+ * ##################################################################
+ * C2 OPERATIONS
+ * ##################################################################
+ */
+
+enum C2_Server_Type {
+ REST,
+ MQTT
+};
+
+typedef struct {
+ char *url;
+ char *ack_url;
+ char *identifier;
+ char *topic;
+ enum C2_Server_Type type;
+} C2_Server;
+
+/****
+ * ##################################################################
+ * Processor OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+ void *processor_ptr;
+} processor;
+
+typedef struct {
+ void *session;
+} processor_session;
+
+/****
+ * ##################################################################
+ * FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct {
+ const char *key;
+ void *value;
+ size_t value_size;
+} attribute;
+
+typedef struct {
+ attribute * attributes;
+ size_t size;
+} attribute_set;
+
+/**
+ * State of a flow file
+ *
+ */
+typedef struct {
+ uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
+
+ void * in;
+
+ void * crp;
+
+ char * contentLocation; /**< Filesystem location of this object */
+
+ void *attributes; /**< Hash map of attributes */
+
+ void *ffp;
+
+} flow_file_record;
+
+typedef struct {
+ void *plan;
+} flow;
+
+typedef enum FS {
+ AS_IS,
+ ROLLBACK
+} FailureStrategy;
+
+#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/processors.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/processors.h b/nanofi/include/core/processors.h
new file mode 100644
index 0000000..7fe357d
--- /dev/null
+++ b/nanofi/include/core/processors.h
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ char *directory;
+ unsigned keep_source :1;
+ unsigned recurse :1;
+} GetFileConfig;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h
new file mode 100644
index 0000000..f620baa
--- /dev/null
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "c2/C2Agent.h"
+#include "core/state/Value.h"
+#include "c2/C2Payload.h"
+#include "c2/C2Protocol.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+typedef int c2_ag_update_callback(char *);
+
+typedef int c2_ag_stop_callback(char *);
+
+typedef int c2_ag_start_callback(char *);
+
+class C2CallbackAgent : public c2::C2Agent {
+
+ public:
+
+ explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+
+ virtual ~C2CallbackAgent() {
+ }
+
+ void setStopCallback(c2_ag_stop_callback *st){
+ stop = st;
+ }
+
+
+ protected:
+ /**
+ * Handles a C2 event requested by the server.
+ * @param resp c2 server response.
+ */
+ virtual void handle_c2_server_response(const C2ContentResponse &resp);
+
+ c2_ag_stop_callback *stop;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h
new file mode 100644
index 0000000..61e824f
--- /dev/null
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -0,0 +1,100 @@
+/**
+ * @file CallbackProcessor.h
+ * CallbackProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CALLBACK_PROCESSOR_H__
+#define __CALLBACK_PROCESSOR_H__
+
+#include <stdio.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <iostream>
+#include <sys/types.h>
+#include "core/cstructs.h"
+#include "io/BaseStream.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// CallbackProcessor Class
+class CallbackProcessor : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
+ : Processor(name, uuid),
+ callback_(nullptr),
+ objref_(nullptr),
+ logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
+ }
+ // Destructor
+ virtual ~CallbackProcessor() {
+
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "CallbackProcessor";
+
+ public:
+
+ void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
+ objref_ = obj;
+ callback_ = ontrigger_callback;
+ }
+
+ // OnTrigger method, implemented by NiFi CallbackProcessor
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ // Initialize, over write by NiFi CallbackProcessor
+ virtual void initialize() {
+ std::set<core::Relationship> relationships;
+ core::Relationship Success("success", "description");
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+ }
+
+ protected:
+ void *objref_;
+ std::function<void(processor_session*)> callback_;
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+REGISTER_RESOURCE(CallbackProcessor);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Instance.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
new file mode 100644
index 0000000..982f6d4
--- /dev/null
+++ b/nanofi/include/cxx/Instance.h
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
+
+#include <memory>
+#include <type_traits>
+#include <string>
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ContentRepository.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/Repository.h"
+
+#include "C2CallbackAgent.h"
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/FlowConfiguration.h"
+#include "ReflexiveSession.h"
+#include "utils/ThreadPool.h"
+#include "core/state/UpdateController.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class ProcessorLink {
+ public:
+ explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
+ : processor_(processor) {
+
+ }
+
+ const std::shared_ptr<core::Processor> &getProcessor() {
+ return processor_;
+ }
+
+ protected:
+ std::shared_ptr<core::Processor> processor_;
+};
+
+class Instance {
+ public:
+
+ explicit Instance(const std::string &url, const std::string &port)
+ : configure_(std::make_shared<Configure>()),
+ url_(url),
+ agent_(nullptr),
+ rpgInitialized_(false),
+ listener_thread_pool_(1),
+ content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+ no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+ running_ = false;
+ stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
+ utils::Identifier uuid;
+ uuid = port;
+ rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
+ proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
+ core::FlowConfiguration::initialize_static_functions();
+ content_repo_->initialize(configure_);
+ }
+
+ ~Instance() {
+ running_ = false;
+ listener_thread_pool_.shutdown();
+ }
+
+ bool isRPGConfigured() {
+ return rpgInitialized_;
+ }
+
+ void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+ running_ = true;
+ if (server->type != C2_Server_Type::MQTT){
+ configure_->set("c2.rest.url",server->url);
+ configure_->set("c2.rest.url.ack",server->ack_url);
+ }
+ agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
+ listener_thread_pool_.start();
+ registerUpdateListener(agent_, 1000);
+ agent_->setStopCallback(c1);
+ }
+
+ void setRemotePort(std::string remote_port) {
+ rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
+ rpg_->initialize();
+ rpg_->setTransmitting(true);
+ rpgInitialized_ = true;
+ }
+
+ std::shared_ptr<Configure> getConfiguration() {
+ return configure_;
+ }
+
+ std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
+ return no_op_repo_;
+ }
+
+ std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+ return content_repo_;
+ }
+
+ void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
+ auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
+ auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
+
+ rpg_->onSchedule(processContext, sessionFactory);
+
+ auto session = std::make_shared<core::ReflexiveSession>(processContext);
+
+ session->add(ff);
+
+ rpg_->onTrigger(processContext, session);
+ }
+
+ protected:
+
+ bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
+ auto functions = updateController->getFunctions();
+ // run all functions independently
+
+ for (auto function : functions) {
+ std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
+ utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
+ std::future<state::Update> future;
+ if (!listener_thread_pool_.execute(std::move(functor), future)) {
+ // denote failure
+ return false;
+ }
+ }
+ return true;
+ }
+
+ std::shared_ptr<c2::C2CallbackAgent> agent_;
+
+ std::atomic<bool> running_;
+
+ bool rpgInitialized_;
+
+ std::shared_ptr<minifi::core::Repository> no_op_repo_;
+
+ std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+
+ std::shared_ptr<core::ProcessorNode> proc_node_;
+ std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
+ std::shared_ptr<io::StreamFactory> stream_factory_;
+ std::string url_;
+ std::shared_ptr<Configure> configure_;
+
+ utils::ThreadPool<state::Update> listener_thread_pool_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Plan.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
new file mode 100644
index 0000000..6171242
--- /dev/null
+++ b/nanofi/include/cxx/Plan.h
@@ -0,0 +1,204 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_CAPI_PLAN_H_
+#define LIBMINIFI_CAPI_PLAN_H_
+
+#ifndef WIN32
+ #include <dirent.h>
+#endif
+#include <cstdio>
+#include <cstdlib>
+#include <sstream>
+#include "ResourceClaim.h"
+#include <vector>
+#include <set>
+#include <map>
+#include "core/logging/Logger.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "properties/Properties.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include "spdlog/sinks/dist_sink.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "core/cstructs.h"
+#include "api/nanofi.h"
+
+using failure_callback_type = std::function<void(flow_file_record*)>;
+using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
+
+namespace {
+
+ void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ auto ff = session->get();
+ if (ff == nullptr) {
+ return;
+ }
+
+ auto claim = ff->getResourceClaim();
+
+ if (claim != nullptr && user_callback != nullptr) {
+ claim->increaseFlowFileRecordOwnedCount();
+ // create a flow file.
+ auto path = claim->getContentFullPath();
+ auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+ ffr->attributes = ff->getAttributesPtr();
+ ffr->ffp = ff.get();
+ auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ *content_repo_ptr = cr_ptr;
+ user_callback(ffr);
+ }
+ session->remove(ff);
+ }
+
+ void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
+ session->rollback();
+ failureStrategyAsIs(session, user_callback, cr_ptr);
+ }
+}
+
+static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
+ { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
+
+class ExecutionPlan {
+ public:
+
+ explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+ std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+ core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+ void reset();
+
+ bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
+ bool setFailureCallback(failure_callback_type onerror_callback);
+
+ bool setFailureStrategy(FailureStrategy start);
+
+ std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+ std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+ std::shared_ptr<core::ProcessSession> getCurrentSession();
+
+ std::shared_ptr<core::Repository> getFlowRepo() {
+ return flow_repo_;
+ }
+
+ std::shared_ptr<core::Repository> getProvenanceRepo() {
+ return prov_repo_;
+ }
+
+ std::shared_ptr<core::ContentRepository> getContentRepo() {
+ return content_repo_;
+ }
+
+ std::shared_ptr<core::FlowFile> getNextFlowFile(){
+ return next_ff_;
+ }
+
+ void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
+ next_ff_ = ptr;
+ }
+
+ static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
+
+ protected:
+ class FailureHandler {
+ public:
+ FailureHandler(content_repo_sptr cr_ptr) {
+ callback_ = nullptr;
+ strategy_ = FailureStrategy::AS_IS;
+ content_repo_ = cr_ptr;
+ }
+ void setCallback(failure_callback_type onerror_callback) {
+ callback_=onerror_callback;
+ }
+ void setStrategy(FailureStrategy strat) {
+ strategy_ = strat;
+ }
+ void operator()(const processor_session* ps) {
+ auto ses = static_cast<core::ProcessSession*>(ps->session);
+ FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
+ }
+ private:
+ failure_callback_type callback_;
+ FailureStrategy strategy_;
+ content_repo_sptr content_repo_;
+ };
+
+ void finalize();
+
+ std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
+
+ std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
+ core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
+
+ std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
+
+ content_repo_sptr content_repo_;
+
+ std::shared_ptr<core::Repository> flow_repo_;
+ std::shared_ptr<core::Repository> prov_repo_;
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+ std::atomic<bool> finalized;
+
+ uint32_t location;
+
+ std::shared_ptr<core::ProcessSession> current_session_;
+ std::shared_ptr<core::FlowFile> current_flowfile_;
+
+ std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+ std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+ std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+ std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+ std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+ std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+ std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+ core::Relationship termination_;
+
+ std::shared_ptr<core::FlowFile> next_ff_;
+
+ private:
+
+ static std::shared_ptr<utils::IdGenerator> id_generator_;
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<FailureHandler> failure_handler_;
+};
+
+#endif /* LIBMINIFI_CAPI_PLAN_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h
new file mode 100644
index 0000000..ebf6cbe
--- /dev/null
+++ b/nanofi/include/cxx/ReflexiveSession.h
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REFLEXIVE_SESSION_H__
+#define __REFLEXIVE_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ReflexiveSession Class
+class ReflexiveSession : public ProcessSession{
+ public:
+ // Constructor
+ /*!
+ * Create a new process session
+ */
+ ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
+ : ProcessSession(processContext){
+ }
+
+// Destructor
+ virtual ~ReflexiveSession() {
+ }
+
+ virtual std::shared_ptr<core::FlowFile> get(){
+ auto prevff = ff;
+ ff = nullptr;
+ return prevff;
+ }
+
+ virtual void add(const std::shared_ptr<core::FlowFile> &flow){
+ ff = flow;
+ }
+ virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
+ // no op
+ }
+ protected:
+ //
+ // Get the FlowFile from the highest priority queue
+ std::shared_ptr<core::FlowFile> ff;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
[3/3] nifi-minifi-cpp git commit: MINIFICPP-659: Break out CAPI into
nanofi
Posted by al...@apache.org.
MINIFICPP-659: Break out CAPI into nanofi
Break out structure -- facilitating Python and new nanofi structure
MINIFICPP-659: Add http-curl to examples and comment for why rocksdb is disabling shared libs
MINIFICPP-659: limit python on apple
MINIFICPP-663: Ensure bison is 3.0.4
This closes #433.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/556794b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/556794b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/556794b1
Branch: refs/heads/master
Commit: 556794b15f1f342bc4b51998c55811c805be91b5
Parents: fc1074a
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Oct 29 14:53:37 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Oct 31 14:27:11 2018 -0400
----------------------------------------------------------------------
.travis.yml | 3 -
CMakeLists.txt | 10 +-
LibExample/CMakeLists.txt | 78 ---
LibExample/generate_flow.c | 64 ---
LibExample/monitor_directory.c | 95 ----
LibExample/terminate_handler.c | 58 ---
LibExample/transmit_flow.c | 92 ----
README.md | 6 +-
blocks/comms.h | 47 --
blocks/file_blocks.h | 38 --
cmake/BuildTests.cmake | 36 +-
darwin.sh | 27 +-
libminifi/CMakeLists.txt | 6 -
libminifi/include/agent/build_description.h | 2 +-
libminifi/include/capi/C2CallbackAgent.h | 82 ---
libminifi/include/capi/Instance.h | 180 -------
libminifi/include/capi/Plan.h | 203 --------
libminifi/include/capi/ReflexiveSession.h | 77 ---
libminifi/include/capi/api.h | 157 ------
libminifi/include/capi/cstructs.h | 127 -----
libminifi/include/capi/expect.h | 32 --
libminifi/include/capi/processors.h | 37 --
libminifi/include/core/expect.h | 32 ++
libminifi/include/io/tls/TLSSocket.h | 2 +-
.../include/processors/CallbackProcessor.h | 100 ----
libminifi/include/utils/ThreadPool.h | 2 +-
libminifi/src/capi/C2CallbackAgent.cpp | 80 ---
libminifi/src/capi/Plan.cpp | 294 -----------
libminifi/src/capi/api.cpp | 517 ------------------
.../repository/VolatileContentRepository.cpp | 2 +-
libminifi/src/processors/CallbackProcessor.cpp | 37 --
libminifi/test/capi/CAPITests.cpp | 279 ----------
nanofi/CMakeLists.txt | 100 ++++
nanofi/examples/CMakeLists.txt | 78 +++
nanofi/examples/generate_flow.c | 65 +++
nanofi/examples/monitor_directory.c | 95 ++++
nanofi/examples/terminate_handler.c | 59 +++
nanofi/examples/transmit_flow.c | 93 ++++
nanofi/include/api/nanofi.h | 159 ++++++
nanofi/include/blocks/comms.h | 48 ++
nanofi/include/blocks/file_blocks.h | 38 ++
nanofi/include/core/cstructs.h | 118 +++++
nanofi/include/core/processors.h | 37 ++
nanofi/include/cxx/C2CallbackAgent.h | 81 +++
nanofi/include/cxx/CallbackProcessor.h | 100 ++++
nanofi/include/cxx/Instance.h | 180 +++++++
nanofi/include/cxx/Plan.h | 204 ++++++++
nanofi/include/cxx/ReflexiveSession.h | 77 +++
nanofi/src/api/nanofi.cpp | 518 +++++++++++++++++++
nanofi/src/cxx/C2CallbackAgent.cpp | 79 +++
nanofi/src/cxx/CallbackProcessor.cpp | 37 ++
nanofi/src/cxx/Plan.cpp | 294 +++++++++++
nanofi/tests/CAPITests.cpp | 278 ++++++++++
python/library/CMakeLists.txt | 6 +-
python/library/python_lib.cpp | 9 +-
thirdparty/rocksdb/CMakeLists.txt | 22 +-
.../yaml-cpp-yaml-cpp-20171024/CMakeLists.txt | 2 +-
57 files changed, 2854 insertions(+), 2725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 2138acd..45ea54e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -70,7 +70,6 @@ matrix:
- package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
- package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
- package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
@@ -92,7 +91,6 @@ matrix:
- package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
- package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
- package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
@@ -114,7 +112,6 @@ matrix:
- package='ossp-uuid'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='boost'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='cmake'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- - package='bison'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link bison --force
- package='flex'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}; brew link flex --force
- package='ccache'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
- package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1c92446..7c81edb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -30,6 +30,7 @@ option(OPENSSL_OFF "Disables OpenSSL" OFF)
option(ENABLE_OPS "Enable Operations Tools" ON)
option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF)
option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON)
+option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
if (WIN32)
option(USE_SYSTEM_ZLIB "Instructs the build system to search for and use a zlib library available in the host system" OFF)
else()
@@ -324,8 +325,12 @@ set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
set(CIVETWEB_ENABLE_SSL OFF CACHE BOOL "DISABLE SSL")
SET(WITH_TOOLS OFF CACHE BOOL "Do not build RocksDB tools")
+if ( NOT APPLE)
if (ENABLE_PYTHON)
-SET(BUILD_SHARED_LIBS ON CACHE BOOL "build yaml cpp shared lib")
+ SET(BUILD_SHARED_LIBS ON CACHE BOOL "Build yaml cpp shared lib" FORCE)
+else()
+ SET(BUILD_SHARED_LIBS OFF CACHE BOOL "Build yaml cpp shared lib" FORCE)
+endif()
endif()
SET(WITH_TESTS OFF CACHE BOOL "Build RocksDB library (not repo) tests")
set(CIVET_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb root")
@@ -471,8 +476,9 @@ endif()
## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
add_subdirectory(main)
+add_subdirectory(nanofi)
+
if (NOT DISABLE_CURL)
- add_subdirectory(LibExample)
if (ENABLE_PYTHON)
if (NOT WIN32)
add_subdirectory(python/library)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt
deleted file mode 100644
index 88901cc..0000000
--- a/LibExample/CMakeLists.txt
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
-
-include_directories(../blocks/ ../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/)
-
-include(CheckCXXCompilerFlag)
-if (WIN32)
- if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
- CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
- if (_cpp_latest_flag_supported)
- add_compile_options("/std:c++14")
- endif()
- endif()
-else()
-CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
-CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
-if(COMPILER_SUPPORTS_CXX11)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
-elseif(COMPILER_SUPPORTS_CXX0X)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
-else()
- message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
-endif()
-
-endif()
-
-if (WIN32)
- set(LINK_FLAGS "/WHOLEARCHIVE")
- set(LINK_END_FLAGS "")
-elseif (APPLE)
- set(LINK_FLAGS "-Wl,-all_load")
- set(LINK_END_FLAGS "")
-else ()
- set(LINK_FLAGS "-Wl,--whole-archive")
- set(LINK_END_FLAGS "-Wl,--no-whole-archive")
-endif ()
-
-add_executable(generate_flow generate_flow.c)
-
-add_executable(terminate_handler terminate_handler.c)
-
-target_link_libraries(generate_flow capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-target_link_libraries(terminate_handler capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-if (NOT WIN32)
-
-add_executable(transmit_flow transmit_flow.c)
-
-target_link_libraries(transmit_flow capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-add_executable(monitor_directory monitor_directory.c)
-
-target_link_libraries(monitor_directory capi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi ${LINK_END_FLAGS})
-
-endif()
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/generate_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/generate_flow.c b/LibExample/generate_flow.c
deleted file mode 100644
index eb5e7a3..0000000
--- a/LibExample/generate_flow.c
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include "capi/api.h"
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
-
- if (argc < 3) {
- printf("Error: must run ./generate_flow <instance> <remote port> \n");
- exit(1);
- }
-
- char *instance_str = argv[1];
- char *portStr = argv[2];
-
- nifi_port port;
-
- port.port_id = portStr;
-
- nifi_instance *instance = create_instance(instance_str, &port);
-
- flow *new_flow = create_flow(instance, "GenerateFlowFile");
-
- flow_file_record *record = get_next_flow_file(instance, new_flow );
-
- if (record == 0){
- printf("Could not create flow file");
- exit(1);
- }
-
- transmit_flowfile(record,instance);
-
- free_flowfile(record);
-
- // initializing will make the transmission slightly more efficient.
- //initialize_instance(instance);
- //transfer_file_or_directory(instance,file);
-
- free_flow(new_flow);
-
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/monitor_directory.c
----------------------------------------------------------------------
diff --git a/LibExample/monitor_directory.c b/LibExample/monitor_directory.c
deleted file mode 100644
index b456fcd..0000000
--- a/LibExample/monitor_directory.c
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <dirent.h>
-#include <pthread.h>
-
-#include "file_blocks.h"
-#include "comms.h"
-#include "capi/api.h"
-#include "capi/processors.h"
-int is_dir(const char *path) {
- struct stat stat_struct;
- if (stat(path, &stat_struct) != 0)
- return 0;
- return S_ISDIR(stat_struct.st_mode);
-}
-
-pthread_mutex_t mutex;
-pthread_cond_t condition;
-
-int stopped;
-
-int stop_callback(char *c) {
- pthread_mutex_lock(&mutex);
- stopped = 1;
- pthread_cond_signal(&condition);
- pthread_mutex_unlock(&mutex);
- return 0;
-}
-
-int is_stopped(void *ptr) {
- int is_stop = 0;
- pthread_mutex_lock(&mutex);
- is_stop = stopped;
- pthread_mutex_unlock(&mutex);
- return is_stop;
-}
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
- if (argc < 5) {
- printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n");
- exit(1);
- }
-
- stopped = 0x00;
-
- char *instance_str = argv[1];
- char *portStr = argv[2];
- char *directory = argv[3];
-
- nifi_port port;
-
- port.port_id = portStr;
-
- C2_Server server;
- server.url = argv[4];
- server.ack_url = argv[5];
- server.identifier = "monitor_directory";
- server.type = REST;
-
- nifi_instance *instance = create_instance(instance_str, &port);
-
- // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
-
- flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
-
- transmit_to_nifi(instance, new_flow, is_stopped);
-
- free_flow(new_flow);
-
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/terminate_handler.c
----------------------------------------------------------------------
diff --git a/LibExample/terminate_handler.c b/LibExample/terminate_handler.c
deleted file mode 100644
index daa9f3b..0000000
--- a/LibExample/terminate_handler.c
+++ /dev/null
@@ -1,58 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include "capi/api.h"
-
-/**
- * This is an example of the C API that registers terminate handler and generates an exception.
- */
-
-void example_terminate_handler() {
- fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!");
- exit(0);
-}
-
-int main(int argc, char **argv) {
-
- nifi_port port;
-
- port.port_id = "12345";
-
- set_terminate_callback(example_terminate_handler);
-
- nifi_instance *instance = create_instance("random instance", &port);
-
- flow *new_flow = create_flow(instance, "GenerateFlowFile");
-
- processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
-
- // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship
- // As it doesn't exist, an exception is thrown
- set_property(put_proc, "Directory", "/tmp/never_existed");
- set_property(put_proc, "Create Missing Directories", "false");
-
- flow_file_record *record = get_next_flow_file(instance, new_flow );
-
- // Here be dragons - nothing below this line gets executed
- fprintf(stderr, "Dragons!!!");
- free_flowfile(record);
- free_flow(new_flow);
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/LibExample/transmit_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c
deleted file mode 100644
index 7c6f6e7..0000000
--- a/LibExample/transmit_flow.c
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <dirent.h>
-#include "capi/api.h"
-
-int is_dir(const char *path) {
- struct stat stat_struct;
- if (stat(path, &stat_struct) != 0)
- return 0;
- return S_ISDIR(stat_struct.st_mode);
-}
-
-void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) {
- int size = 1;
-
- if (is_dir(file_or_dir)) {
- DIR *d;
-
- struct dirent *dir;
- d = opendir(file_or_dir);
- if (d) {
- while ((dir = readdir(d)) != NULL) {
- if (!memcmp(dir->d_name,".",1) )
- continue;
- char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2);
- sprintf(file_path,"%s/%s",file_or_dir,dir->d_name);
- transfer_file_or_directory(instance,file_path);
- free(file_path);
- }
- closedir(d);
- }
- printf("%s is a directory", file_or_dir);
- } else {
- printf("Transferring %s\n",file_or_dir);
-
- flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir));
-
- add_attribute(record, "addedattribute", "1", 2);
-
- transmit_flowfile(record, instance);
-
- free_flowfile(record);
- }
-}
-
-/**
- * This is an example of the C API that transmits a flow file to a remote instance.
- */
-int main(int argc, char **argv) {
-
- if (argc < 4) {
- printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n");
- exit(1);
- }
-
- char *instance_str = argv[1];
- char *portStr = argv[2];
- char *file = argv[3];
-
- nifi_port port;
-
- port.port_id = portStr;
-
- nifi_instance *instance = create_instance(instance_str, &port);
-
- // initializing will make the transmission slightly more efficient.
- //initialize_instance(instance);
- transfer_file_or_directory(instance,file);
-
- free_instance(instance);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 542dd6c..ae37b62 100644
--- a/README.md
+++ b/README.md
@@ -102,7 +102,7 @@ MiNiFi - C++ supports the following processors:
* g++
* 4.8.4 or greater
* bison
- * 3.0 or greater
+ * 3.0.x (3.2 has been shown to fail builds)
* flex
* 2.5 or greater
@@ -160,7 +160,7 @@ $ scl enable devtoolset-6 bash
```
Additionally, for expression language support, it is recommended to install GNU
-Bison 3.x:
+Bison 3.0.4:
```
$ wget https://ftp.gnu.org/gnu/bison/bison-3.0.4.tar.xz
@@ -279,7 +279,6 @@ $ apt-get install libpcap-dev
```
# ~/Development/code/apache/nifi-minifi-cpp on git:master
$ brew install cmake \
- bison \
flex \
patch \
autoconf \
@@ -305,6 +304,7 @@ $ sudo pip install virtualenv
$ brew install gpsd
$ # (Optional) for PacketCapture Processor
$ sudo brew install libpcap
+$ # It is recommended that you install bison from source as HomeBrew now uses an incompatible version of Bison
```
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/blocks/comms.h
----------------------------------------------------------------------
diff --git a/blocks/comms.h b/blocks/comms.h
deleted file mode 100644
index 3f5fda1..0000000
--- a/blocks/comms.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef BLOCKS_COMMS_H_
-#define BLOCKS_COMMS_H_
-
-#include <stdio.h>
-#include "capi/api.h"
-#include "capi/processors.h"
-
-#define SUCCESS 0x00
-#define FINISHED_EARLY 0x01
-#define FAIL 0x02
-
-typedef int transmission_stop(void *);
-
-uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) {
-
- flow_file_record *record = 0x00;
- do {
- record = get_next_flow_file(instance, flow);
-
- if (record == 0) {
- return FINISHED_EARLY;
- }
- transmit_flowfile(record, instance);
-
- free_flowfile(record);
- } while (record != 0x00 && !( stop_callback != 0x00 && stop_callback(0x00)));
- return SUCCESS;
-}
-
-#endif /* BLOCKS_COMMS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/blocks/file_blocks.h b/blocks/file_blocks.h
deleted file mode 100644
index 3f1d6f7..0000000
--- a/blocks/file_blocks.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef BLOCKS_FILE_BLOCKS_H_
-#define BLOCKS_FILE_BLOCKS_H_
-
-#include "capi/api.h"
-#include "capi/processors.h"
-
-#define KEEP_SOURCE 0x01
-#define RECURSE 0x02
-
-/**
- * Monitor directory can be combined into a current flow. to create an execution plan
- */
-flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) {
- GetFileConfig config;
- config.directory = directory;
- config.keep_source = flags & KEEP_SOURCE;
- config.recurse = flags & RECURSE;
- return create_getfile(instance, parent_flow, &config);
-}
-
-#endif /* BLOCKS_FILE_BLOCKS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index d14c19b..c7ec1d6 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -29,12 +29,13 @@ MACRO(GETSOURCEFILES result curdir)
SET(${result} ${dirlist})
ENDMACRO()
+set(NANOFI_TEST_DIR "${CMAKE_SOURCE_DIR}/nanofi/tests/")
+
if(NOT EXCLUDE_BOOST)
find_package(Boost COMPONENTS system filesystem)
endif()
-function(createTests testName)
- message ("-- Adding test: ${testName}")
+function(appendIncludes testName)
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
@@ -60,6 +61,11 @@ function(createTests testName)
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/utils")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/processors")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/provenance")
+endfunction()
+
+function(createTests testName)
+ message ("-- Adding test: ${testName}")
+ appendIncludes("${testName}")
if (ENABLE_BINARY_DIFF)
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/bsdiff/")
@@ -70,11 +76,13 @@ function(createTests testName)
endif()
target_link_libraries(${testName} ${CMAKE_DL_LIBS} ${SPD_LIB} ${TEST_BASE_LIB})
target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp)
- if (APPLE)
- target_link_libraries (${testName} -Wl,-all_load minifi)
- else ()
- target_link_libraries (${testName} -Wl,--whole-archive minifi -Wl,--no-whole-archive)
- endif ()
+ if (NOT excludeBase)
+ if (APPLE)
+ target_link_libraries (${testName} -Wl,-all_load minifi)
+ else ()
+ target_link_libraries (${testName} -Wl,--whole-archive minifi -Wl,--no-whole-archive)
+ endif ()
+ endif()
if (Boost_FOUND)
target_link_libraries(${testName} ${Boost_SYSTEM_LIBRARY})
target_link_libraries(${testName} ${Boost_FILESYSTEM_LIBRARY})
@@ -105,7 +113,7 @@ target_include_directories(${CATCH_MAIN_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}
SET(TEST_RESOURCES ${TEST_DIR}/resources)
GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/")
-GETSOURCEFILES(CAPI_UNIT_TESTS "${TEST_DIR}/capi/")
+GETSOURCEFILES(NANOFI_UNIT_TESTS "${NANOFI_TEST_DIR}")
GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
SET(UNIT_TEST_COUNT 0)
@@ -120,15 +128,17 @@ ENDFOREACH()
message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...")
SET(UNIT_TEST_COUNT 0)
-FOREACH(testfile ${CAPI_UNIT_TESTS})
+FOREACH(testfile ${NANOFI_UNIT_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
- add_executable("${testfilename}" "${TEST_DIR}/capi/${testfile}")
- createTests("${testfilename}")
- target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} capi)
+ add_executable("${testfilename}" "${NANOFI_TEST_DIR}/${testfile}")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/nanofi/include")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
+ appendIncludes("${testfilename}")
+ target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT} ${CATCH_MAIN_LIB} nanofi)
MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1")
add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
ENDFOREACH()
-message("-- Finished building ${UNIT_TEST_COUNT} capi unit test file(s)...")
+message("-- Finished building ${UNIT_TEST_COUNT} NanoFi unit test file(s)...")
SET(INT_TEST_COUNT 0)
FOREACH(testfile ${INTEGRATION_TESTS})
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/darwin.sh
----------------------------------------------------------------------
diff --git a/darwin.sh b/darwin.sh
index 7a4ef90..e02e1eb 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -40,6 +40,30 @@ add_os_flags(){
:
}
+install_bison() {
+ BISON_INSTALLED="false"
+ if [ -x "$(command -v bison)" ]; then
+ BISON_VERSION=`bison --version | head -n 1 | awk '{print $4}'`
+ BISON_MAJOR=`echo $BISON_VERSION | cut -d. -f1`
+ if (( BISON_MAJOR >= 3 )); then
+ BISON_INSTALLED="true"
+ fi
+ fi
+ if [ "$BISON_INSTALLED" = "false" ]; then
+ ## ensure that the toolchain is installed
+ INSTALL_BASE="sudo zypper in -y gcc gcc-c++"
+ ${INSTALL_BASE}
+ wget https://ftp.gnu.org/gnu/bison/bison-3.0.4.tar.xz
+ tar xvf bison-3.0.4.tar.xz
+ pushd bison-3.0.4
+ ./configure
+ make
+ sudo make install
+ popd
+ fi
+
+}
+
bootstrap_cmake(){
brew install cmake
}
@@ -69,7 +93,7 @@ build_deps(){
elif [ "$FOUND_VALUE" = "libpng" ]; then
INSTALLED+=("libpng")
elif [ "$FOUND_VALUE" = "bison" ]; then
- INSTALLED+=("bison")
+ install_bison
elif [ "$FOUND_VALUE" = "flex" ]; then
INSTALLED+=("flex")
elif [ "$FOUND_VALUE" = "python" ]; then
@@ -101,5 +125,4 @@ build_deps(){
brew link curl --force > /dev/null 2>&1
fi
done
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 9afcd6b..f6cc302 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -101,7 +101,6 @@ file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/s
file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" )
-file(GLOB CAPI_SOURCES "src/capi/*.cpp" )
file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
@@ -130,7 +129,6 @@ if (OPENSSL_FOUND)
endif (OPENSSL_FOUND)
add_library(minifi STATIC ${PROCESSOR_SOURCES})
-add_library(capi STATIC ${CAPI_SOURCES})
target_link_libraries(minifi core-minifi)
@@ -167,8 +165,6 @@ if (OPENSSL_FOUND)
endif (OPENSSL_FOUND)
add_library(minifi-shared SHARED ${PROCESSOR_SOURCES})
-add_library(capi-shared SHARED ${CAPI_SOURCES})
-
target_link_libraries(minifi-shared core-minifi-shared)
if (WIN32)
@@ -177,9 +173,7 @@ set_target_properties(minifi-shared PROPERTIES LINK_FLAGS "${LINK_FLAGS} /WHOLEA
endif()
-
set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
-set_property(TARGET capi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
endif()
endif(ENABLE_PYTHON)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/agent/build_description.h
----------------------------------------------------------------------
diff --git a/libminifi/include/agent/build_description.h b/libminifi/include/agent/build_description.h
index 5359595..4d6a1e6 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -19,7 +19,7 @@
#define BUILD_DESCRPTION_H
#include <vector>
-#include "capi/expect.h"
+#include "core/expect.h"
namespace org {
namespace apache {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/C2CallbackAgent.h b/libminifi/include/capi/C2CallbackAgent.h
deleted file mode 100644
index c7ee5a9..0000000
--- a/libminifi/include/capi/C2CallbackAgent.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
-#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
-
-#include <utility>
-#include <functional>
-#include <future>
-#include <memory>
-#include <mutex>
-#include <thread>
-
-#include "c2/C2Agent.h"
-#include "core/state/UpdateController.h"
-#include "core/state/Value.h"
-#include "c2/C2Payload.h"
-#include "c2/C2Protocol.h"
-#include "io/validation.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-typedef int c2_ag_update_callback(char *);
-
-typedef int c2_ag_stop_callback(char *);
-
-typedef int c2_ag_start_callback(char *);
-
-class C2CallbackAgent : public c2::C2Agent {
-
- public:
-
- explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
-
- virtual ~C2CallbackAgent() {
- }
-
- void setStopCallback(c2_ag_stop_callback *st){
- stop = st;
- }
-
-
- protected:
- /**
- * Handles a C2 event requested by the server.
- * @param resp c2 server response.
- */
- virtual void handle_c2_server_response(const C2ContentResponse &resp);
-
- c2_ag_stop_callback *stop;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
-
-};
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-
-#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h
deleted file mode 100644
index 982f6d4..0000000
--- a/libminifi/include/capi/Instance.h
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
-#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_
-
-#include <memory>
-#include <type_traits>
-#include <string>
-#include "core/Property.h"
-#include "properties/Configure.h"
-#include "io/StreamFactory.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ContentRepository.h"
-#include "core/repository/VolatileContentRepository.h"
-#include "core/Repository.h"
-
-#include "C2CallbackAgent.h"
-#include "core/Connectable.h"
-#include "core/ProcessorNode.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/controller/ControllerServiceProvider.h"
-#include "core/FlowConfiguration.h"
-#include "ReflexiveSession.h"
-#include "utils/ThreadPool.h"
-#include "core/state/UpdateController.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-class ProcessorLink {
- public:
- explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor)
- : processor_(processor) {
-
- }
-
- const std::shared_ptr<core::Processor> &getProcessor() {
- return processor_;
- }
-
- protected:
- std::shared_ptr<core::Processor> processor_;
-};
-
-class Instance {
- public:
-
- explicit Instance(const std::string &url, const std::string &port)
- : configure_(std::make_shared<Configure>()),
- url_(url),
- agent_(nullptr),
- rpgInitialized_(false),
- listener_thread_pool_(1),
- content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
- no_op_repo_(std::make_shared<minifi::core::Repository>()) {
- running_ = false;
- stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
- utils::Identifier uuid;
- uuid = port;
- rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid);
- proc_node_ = std::make_shared<core::ProcessorNode>(rpg_);
- core::FlowConfiguration::initialize_static_functions();
- content_repo_->initialize(configure_);
- }
-
- ~Instance() {
- running_ = false;
- listener_thread_pool_.shutdown();
- }
-
- bool isRPGConfigured() {
- return rpgInitialized_;
- }
-
- void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
- running_ = true;
- if (server->type != C2_Server_Type::MQTT){
- configure_->set("c2.rest.url",server->url);
- configure_->set("c2.rest.url.ack",server->ack_url);
- }
- agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
- listener_thread_pool_.start();
- registerUpdateListener(agent_, 1000);
- agent_->setStopCallback(c1);
- }
-
- void setRemotePort(std::string remote_port) {
- rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
- rpg_->initialize();
- rpg_->setTransmitting(true);
- rpgInitialized_ = true;
- }
-
- std::shared_ptr<Configure> getConfiguration() {
- return configure_;
- }
-
- std::shared_ptr<minifi::core::Repository> getNoOpRepository() {
- return no_op_repo_;
- }
-
- std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
- return content_repo_;
- }
-
- void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
- auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
- auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
-
- rpg_->onSchedule(processContext, sessionFactory);
-
- auto session = std::make_shared<core::ReflexiveSession>(processContext);
-
- session->add(ff);
-
- rpg_->onTrigger(processContext, session);
- }
-
- protected:
-
- bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) {
- auto functions = updateController->getFunctions();
- // run all functions independently
-
- for (auto function : functions) {
- std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
- utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
- std::future<state::Update> future;
- if (!listener_thread_pool_.execute(std::move(functor), future)) {
- // denote failure
- return false;
- }
- }
- return true;
- }
-
- std::shared_ptr<c2::C2CallbackAgent> agent_;
-
- std::atomic<bool> running_;
-
- bool rpgInitialized_;
-
- std::shared_ptr<minifi::core::Repository> no_op_repo_;
-
- std::shared_ptr<minifi::core::ContentRepository> content_repo_;
-
- std::shared_ptr<core::ProcessorNode> proc_node_;
- std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_;
- std::shared_ptr<io::StreamFactory> stream_factory_;
- std::string url_;
- std::shared_ptr<Configure> configure_;
-
- utils::ThreadPool<state::Update> listener_thread_pool_;
-};
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/Plan.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h
deleted file mode 100644
index 75330a0..0000000
--- a/libminifi/include/capi/Plan.h
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_CAPI_PLAN_H_
-#define LIBMINIFI_CAPI_PLAN_H_
-#ifndef WIN32
- #include <dirent.h>
-#endif
-#include <cstdio>
-#include <cstdlib>
-#include <sstream>
-#include "ResourceClaim.h"
-#include <vector>
-#include <set>
-#include <map>
-#include "core/logging/Logger.h"
-#include "core/Core.h"
-#include "properties/Configure.h"
-#include "properties/Properties.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/Id.h"
-#include "spdlog/sinks/ostream_sink.h"
-#include "spdlog/sinks/dist_sink.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-#include "capi/cstructs.h"
-#include "capi/api.h"
-
-using failure_callback_type = std::function<void(flow_file_record*)>;
-using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
-
-namespace {
-
- void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
- auto ff = session->get();
- if (ff == nullptr) {
- return;
- }
-
- auto claim = ff->getResourceClaim();
-
- if (claim != nullptr && user_callback != nullptr) {
- claim->increaseFlowFileRecordOwnedCount();
- // create a flow file.
- auto path = claim->getContentFullPath();
- auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
- ffr->attributes = ff->getAttributesPtr();
- ffr->ffp = ff.get();
- auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
- *content_repo_ptr = cr_ptr;
- user_callback(ffr);
- }
- session->remove(ff);
- }
-
- void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
- session->rollback();
- failureStrategyAsIs(session, user_callback, cr_ptr);
- }
-}
-
-static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
- { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };
-
-class ExecutionPlan {
- public:
-
- explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
-
- std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>);
-
- std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
- core::Relationship relationship = core::Relationship("success", "description"),
- bool linkToPrevious = false);
-
- std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
- bool linkToPrevious = false);
-
- bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
-
- void reset();
-
- bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
-
- bool setFailureCallback(failure_callback_type onerror_callback);
-
- bool setFailureStrategy(FailureStrategy start);
-
- std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
-
- std::shared_ptr<core::FlowFile> getCurrentFlowFile();
-
- std::shared_ptr<core::ProcessSession> getCurrentSession();
-
- std::shared_ptr<core::Repository> getFlowRepo() {
- return flow_repo_;
- }
-
- std::shared_ptr<core::Repository> getProvenanceRepo() {
- return prov_repo_;
- }
-
- std::shared_ptr<core::ContentRepository> getContentRepo() {
- return content_repo_;
- }
-
- std::shared_ptr<core::FlowFile> getNextFlowFile(){
- return next_ff_;
- }
-
- void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
- next_ff_ = ptr;
- }
-
- static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);
-
- protected:
- class FailureHandler {
- public:
- FailureHandler(content_repo_sptr cr_ptr) {
- callback_ = nullptr;
- strategy_ = FailureStrategy::AS_IS;
- content_repo_ = cr_ptr;
- }
- void setCallback(failure_callback_type onerror_callback) {
- callback_=onerror_callback;
- }
- void setStrategy(FailureStrategy strat) {
- strategy_ = strat;
- }
- void operator()(const processor_session* ps) {
- auto ses = static_cast<core::ProcessSession*>(ps->session);
- FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
- }
- private:
- failure_callback_type callback_;
- FailureStrategy strategy_;
- content_repo_sptr content_repo_;
- };
-
- void finalize();
-
- std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);
-
- std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
- core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);
-
- std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
-
- content_repo_sptr content_repo_;
-
- std::shared_ptr<core::Repository> flow_repo_;
- std::shared_ptr<core::Repository> prov_repo_;
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
-
- std::atomic<bool> finalized;
-
- uint32_t location;
-
- std::shared_ptr<core::ProcessSession> current_session_;
- std::shared_ptr<core::FlowFile> current_flowfile_;
-
- std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
- std::vector<std::shared_ptr<core::Processor>> processor_queue_;
- std::vector<std::shared_ptr<core::Processor>> configured_processors_;
- std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
- std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
- std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
- std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
- std::vector<std::shared_ptr<minifi::Connection>> relationships_;
- core::Relationship termination_;
-
- std::shared_ptr<core::FlowFile> next_ff_;
-
- private:
-
- static std::shared_ptr<utils::IdGenerator> id_generator_;
- std::shared_ptr<logging::Logger> logger_;
- std::shared_ptr<FailureHandler> failure_handler_;
-};
-
-#endif /* LIBMINIFI_CAPI_PLAN_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/ReflexiveSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/ReflexiveSession.h b/libminifi/include/capi/ReflexiveSession.h
deleted file mode 100644
index ebf6cbe..0000000
--- a/libminifi/include/capi/ReflexiveSession.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REFLEXIVE_SESSION_H__
-#define __REFLEXIVE_SESSION_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <set>
-
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-// ReflexiveSession Class
-class ReflexiveSession : public ProcessSession{
- public:
- // Constructor
- /*!
- * Create a new process session
- */
- ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
- : ProcessSession(processContext){
- }
-
-// Destructor
- virtual ~ReflexiveSession() {
- }
-
- virtual std::shared_ptr<core::FlowFile> get(){
- auto prevff = ff;
- ff = nullptr;
- return prevff;
- }
-
- virtual void add(const std::shared_ptr<core::FlowFile> &flow){
- ff = flow;
- }
- virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){
- // no op
- }
- protected:
- //
- // Get the FlowFile from the highest priority queue
- std::shared_ptr<core::FlowFile> ff;
-
-};
-
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
deleted file mode 100644
index e05c04e..0000000
--- a/libminifi/include/capi/api.h
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
-#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include "cstructs.h"
-#include "processors.h"
-
-int initialize_api();
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Updates with every release
- */
-#define API_VERSION "0.02"
-
-void enable_logging();
-
-void set_terminate_callback(void (*terminate_callback)());
-
-/****
- * ##################################################################
- * BASE NIFI OPERATIONS
- * ##################################################################
- */
-
-nifi_instance *create_instance(const char *url, nifi_port *port);
-
-void initialize_instance(nifi_instance *);
-
-void free_instance(nifi_instance*);
-
-/****
- * ##################################################################
- * C2 OPERATIONS
- * ##################################################################
- */
-
-
-typedef int c2_update_callback(char *);
-
-typedef int c2_stop_callback(char *);
-
-typedef int c2_start_callback(char *);
-
-void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
-
-
-uint8_t run_processor(const processor *processor);
-
-flow *create_new_flow(nifi_instance *);
-
-flow *create_flow(nifi_instance *, const char *);
-
-flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
-
-processor *add_processor(flow *, const char *);
-
-processor *add_processor_with_linkage(flow *flow, const char *processor_name);
-
-processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
-
-/**
-* Register your callback to received flow files that the flow failed to process
-* The flow file ownership is transferred to the caller!
-* The first callback should be registered before the flow is used. Can be changed later during runtime.
-*/
-int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
-
-
-/**
-* Set failure strategy. Please use the enum defined in cstructs.h
-* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?)
-* Can be changed runtime.
-* The defailt strategy is AS IS.
-*/
-int set_failure_strategy(flow *flow, FailureStrategy strategy);
-
-int set_property(processor *, const char *, const char *);
-
-int set_instance_property(nifi_instance *instance, const char*, const char *);
-
-int free_flow(flow *);
-
-flow_file_record *get_next_flow_file(nifi_instance *, flow *);
-
-size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
-
-flow_file_record *get(nifi_instance *,flow *, processor_session *);
-
-int transfer(processor_session* session, flow *flow, const char *rel);
-
-/**
- * Creates a flow file object.
- * Will obtain the size of file
- */
-flow_file_record* create_flowfile(const char *file, const size_t len);
-
-flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
-
-flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
-
-void free_flowfile(flow_file_record*);
-
-uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
-
-void update_attribute(flow_file_record*, const char *key, void *value, size_t size);
-
-uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
-
-int get_attribute_qty(const flow_file_record* ff);
-
-int get_all_attributes(const flow_file_record* ff, attribute_set *target);
-
-uint8_t remove_attribute(flow_file_record*, char *key);
-
-/****
- * ##################################################################
- * Remote NIFI OPERATIONS
- * ##################################################################
- */
-
-int transmit_flowfile(flow_file_record *, nifi_instance *);
-
-/****
- * ##################################################################
- * Persistence Operations
- * ##################################################################
- */
-
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/cstructs.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h
deleted file mode 100644
index 55197d9..0000000
--- a/libminifi/include/capi/cstructs.h
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
-#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
-
-#include <stddef.h>
-#include <stdint.h>
-
-/**
- * NiFi Port struct
- */
-typedef struct {
- char *port_id;
-} nifi_port;
-
-
-/**
- * Nifi instance struct
- */
-typedef struct {
-
- void *instance_ptr;
-
- nifi_port port;
-
-} nifi_instance;
-
-
-/****
- * ##################################################################
- * C2 OPERATIONS
- * ##################################################################
- */
-
-enum C2_Server_Type{
- REST,
- MQTT
-};
-
-typedef struct {
- char *url;
- char *ack_url;
- char *identifier;
- char *topic;
- enum C2_Server_Type type;
-} C2_Server;
-
-
-/****
- * ##################################################################
- * Processor OPERATIONS
- * ##################################################################
- */
-
-typedef struct {
- void *processor_ptr;
-} processor;
-
-
-typedef struct {
-
- void *session;
-
-} processor_session;
-
-
-
-/****
- * ##################################################################
- * FLOWFILE OPERATIONS
- * ##################################################################
- */
-
-typedef struct {
- const char *key;
- void *value;
- size_t value_size;
-} attribute;
-
-typedef struct {
- attribute * attributes;
- size_t size;
-} attribute_set;
-
-/**
- * State of a flow file
- *
- */
-typedef struct {
- uint64_t size; /**< Size in bytes of the data corresponding to this flow file */
-
- void * in;
-
- void * crp;
-
- char * contentLocation; /**< Filesystem location of this object */
-
- void *attributes; /**< Hash map of attributes */
-
- void *ffp;
-
-} flow_file_record;
-
-
-typedef struct {
- void *plan;
-} flow;
-
-typedef enum FS { AS_IS, ROLLBACK } FailureStrategy;
-
-#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/expect.h b/libminifi/include/capi/expect.h
deleted file mode 100644
index ead182c..0000000
--- a/libminifi/include/capi/expect.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
-
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
-#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
-
-
-// various likely/unlikely pragmas I've carried over the years.
-// you'll see this in many projects
-#if defined(__GNUC__) && __GNUC__ >= 4
-#define LIKELY(x) (__builtin_expect((x), 1))
-#define UNLIKELY(x) (__builtin_expect((x), 0))
-#else
-#define LIKELY(x) (x)
-#define UNLIKELY(x) (x)
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/capi/processors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/processors.h b/libminifi/include/capi/processors.h
deleted file mode 100644
index 7fe357d..0000000
--- a/libminifi/include/capi/processors.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
-#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
-
-
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct {
- char *directory;
- unsigned keep_source :1;
- unsigned recurse :1;
-} GetFileConfig;
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/core/expect.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/expect.h b/libminifi/include/core/expect.h
new file mode 100644
index 0000000..ead182c
--- /dev/null
+++ b/libminifi/include/core/expect.h
@@ -0,0 +1,32 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_
+
+
+// various likely/unlikely pragmas I've carried over the years.
+// you'll see this in many projects
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define LIKELY(x) (__builtin_expect((x), 1))
+#define UNLIKELY(x) (__builtin_expect((x), 0))
+#else
+#define LIKELY(x) (x)
+#define UNLIKELY(x) (x)
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index d3257e1..b6f625c 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -24,7 +24,7 @@
#include <atomic>
#include <cstdint>
#include "io/ClientSocket.h"
-#include "capi/expect.h"
+#include "core/expect.h"
#include "properties/Configure.h"
namespace org {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/processors/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h
deleted file mode 100644
index 801c99c..0000000
--- a/libminifi/include/processors/CallbackProcessor.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * @file CallbackProcessor.h
- * CallbackProcessor class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __CALLBACK_PROCESSOR_H__
-#define __CALLBACK_PROCESSOR_H__
-
-#include <stdio.h>
-#include <string>
-#include <errno.h>
-#include <chrono>
-#include <thread>
-#include <functional>
-#include <iostream>
-#include <sys/types.h>
-#include "capi/cstructs.h"
-#include "io/BaseStream.h"
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// CallbackProcessor Class
-class CallbackProcessor : public core::Processor {
- public:
- // Constructor
- /*!
- * Create a new processor
- */
- CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
- : Processor(name, uuid),
- callback_(nullptr),
- objref_(nullptr),
- logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) {
- }
- // Destructor
- virtual ~CallbackProcessor() {
-
- }
- // Processor Name
- static constexpr char const* ProcessorName = "CallbackProcessor";
-
- public:
-
- void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) {
- objref_ = obj;
- callback_ = ontrigger_callback;
- }
-
- // OnTrigger method, implemented by NiFi CallbackProcessor
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- // Initialize, over write by NiFi CallbackProcessor
- virtual void initialize() {
- std::set<core::Relationship> relationships;
- core::Relationship Success("success", "description");
- relationships.insert(Success);
- setSupportedRelationships(relationships);
- }
-
- protected:
- void *objref_;
- std::function<void(processor_session*)> callback_;
- private:
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
-};
-
-REGISTER_RESOURCE(CallbackProcessor);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index ffb28fe..fc54a25 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -30,7 +30,7 @@
#include <functional>
#include "BackTrace.h"
-#include "capi/expect.h"
+#include "core/expect.h"
#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
#include "core/controller/ControllerService.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp
deleted file mode 100644
index 4b7f5fb..0000000
--- a/libminifi/src/capi/C2CallbackAgent.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "capi/C2CallbackAgent.h"
-#include <csignal>
-#include <utility>
-#include <vector>
-#include <map>
-#include <string>
-#include <memory>
-#include "c2/ControllerSocketProtocol.h"
-#include "core/state/UpdateController.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/file/FileUtils.h"
-#include "utils/file/FileManager.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
- const std::shared_ptr<Configure> &configuration)
- : C2Agent(controller, updateSink, configuration),
- stop(nullptr),
- logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
-}
-
-void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) {
- switch (resp.op) {
- case Operation::CLEAR:
- break;
- case Operation::UPDATE:
- break;
- case Operation::DESCRIBE:
- break;
- case Operation::RESTART:
- break;
- case Operation::START:
- case Operation::STOP: {
- if (resp.name == "C2" || resp.name == "c2") {
- raise(SIGTERM);
- }
-
- auto str = resp.name.c_str();
-
- if (nullptr != stop)
- stop(const_cast<char*>(str));
-
- break;
- }
- //
- break;
- default:
- break;
- // do nothing
- }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
deleted file mode 100644
index 78b864c..0000000
--- a/libminifi/src/capi/Plan.cpp
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "capi/Plan.h"
-#include "processors/CallbackProcessor.h"
-#include <memory>
-#include <vector>
-#include <set>
-#include <string>
-
-bool intToFailureStragey(int in, FailureStrategy *out) {
- auto tmp = static_cast<FailureStrategy>(in);
- switch (tmp) {
- case AS_IS:
- case ROLLBACK:
- *out = tmp;
- return true;
- default:
- return false;
- }
-}
-
-std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
- : content_repo_(content_repo),
- flow_repo_(flow_repo),
- prov_repo_(prov_repo),
- finalized(false),
- location(-1),
- current_flowfile_(nullptr),
- logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
- stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
-}
-
-/**
- * Add a callback to obtain and pass processor session to a generated processor
- *
- */
-std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) {
- if (finalized) {
- return nullptr;
- }
-
- auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
- if (!ptr)
- return nullptr;
-
- std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
- processor->setCallback(obj, fp);
-
- return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true);
-}
-
-bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
- uint32_t i = 0;
- logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
- for (i = 0; i < processor_queue_.size(); i++) {
- if (processor_queue_.at(i) == proc) {
- break;
- }
- }
-
- if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
- return false;
- }
-
- return processor_contexts_.at(i)->setProperty(prop, value);
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
- if (finalized) {
- return nullptr;
- }
-
- utils::Identifier uuid;
- id_generator_->generate(uuid);
-
- processor->setStreamFactory(stream_factory);
- // initialize the processor
- processor->initialize();
-
- processor_mapping_[processor->getUUIDStr()] = processor;
-
- if (!linkToPrevious) {
- termination_ = relationship;
- } else {
- std::shared_ptr<core::Processor> last = processor_queue_.back();
-
- if (last == nullptr) {
- last = processor;
- termination_ = relationship;
- }
-
- relationships_.push_back(connectProcessors(last, processor, relationship, true));
- }
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
- processor_nodes_.push_back(node);
-
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
- processor_contexts_.push_back(context);
-
- processor_queue_.push_back(processor);
-
- return processor;
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
- if (finalized) {
- return nullptr;
- }
- auto processor = ExecutionPlan::createProcessor(processor_name, name);
- if (!processor) {
- return nullptr;
- }
- return addProcessor(processor, name, relationship, linkToPrevious);
-}
-
-void ExecutionPlan::reset() {
- process_sessions_.clear();
- factories_.clear();
- location = -1;
- for (auto proc : processor_queue_) {
- while (proc->getActiveTasks() > 0) {
- proc->decrementActiveTask();
- }
- }
-}
-
-bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
- if (!finalized) {
- finalize();
- }
- location++;
- if (location >= processor_queue_.size()) {
- return false;
- }
- std::shared_ptr<core::Processor> processor = processor_queue_[location];
- std::shared_ptr<core::ProcessContext> context = processor_contexts_[location];
- std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
- factories_.push_back(factory);
- if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
- processor->onSchedule(context, factory);
- configured_processors_.push_back(processor);
- }
- std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
- process_sessions_.push_back(current_session);
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- if (verify != nullptr) {
- verify(context, current_session);
- } else {
- logger_->log_debug("Running %s", processor->getName());
- processor->onTrigger(context, current_session);
- }
- current_session->commit();
- current_flowfile_ = current_session->get();
- auto hasMore = location + 1 < processor_queue_.size();
- if (!hasMore && !current_flowfile_) {
- std::set<std::shared_ptr<core::FlowFile>> expired;
- current_flowfile_ = relationships_.back()->poll(expired);
- }
- return hasMore;
-}
-
-std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() {
- return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
-}
-
-std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
- return current_flowfile_;
-}
-
-std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
- return current_session_;
-}
-
-std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
- return connectProcessors(processor, processor, termination_, set_dst);
-}
-
-void ExecutionPlan::finalize() {
- if (failure_handler_) {
- auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor");
-
- std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
- callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
-
- for (const auto& proc : processor_queue_) {
- for (const auto& rel : proc->getSupportedRelationships()) {
- if (rel.getName() == "failure") {
- relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
- break;
- }
- }
- }
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
-
- processor_nodes_.push_back(node);
-
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
- processor_contexts_.push_back(context);
-
- processor_queue_.push_back(failure_proc);
- }
-
- if (relationships_.size() > 0) {
- relationships_.push_back(buildFinalConnection(processor_queue_.back()));
- } else {
- for (auto processor : processor_queue_) {
- relationships_.push_back(buildFinalConnection(processor, true));
- }
- }
-
- finalized = true;
-}
-
-std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
- utils::Identifier uuid;
- id_generator_->generate(uuid);
-
- auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
- if (nullptr == ptr) {
- return nullptr;
- }
- std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
- processor->setName(name);
- return processor;
-}
-
-std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
- core::Relationship relationship, bool set_dst) {
- std::stringstream connection_name;
- connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
- connection->setRelationship(relationship);
-
- // link the connections so that we can test results at the end for this
- connection->setSource(src_proc);
-
- utils::Identifier uuid_copy, uuid_copy_next;
- src_proc->getUUID(uuid_copy);
- connection->setSourceUUID(uuid_copy);
- if (set_dst) {
- connection->setDestination(dst_proc);
- dst_proc->getUUID(uuid_copy_next);
- connection->setDestinationUUID(uuid_copy_next);
- if (src_proc != dst_proc) {
- dst_proc->addConnection(connection);
- }
- }
- src_proc->addConnection(connection);
-
- return connection;
-}
-
-bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
- if (finalized && !failure_handler_) {
- return false; // Already finalized the flow without failure handler processor
- }
- if (!failure_handler_) {
- failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
- }
- failure_handler_->setCallback(onerror_callback);
- return true;
-}
-
-bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
- if (!failure_handler_) {
- return false;
- }
- failure_handler_->setStrategy(start);
- return true;
-}
-