You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:10 UTC
[05/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces
and removes use of raw pointers for user facing API.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
new file mode 100644
index 0000000..e5703d1
--- /dev/null
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ConfigurableComponent.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> logger)
+ : logger_(logger) {
+
+}
+
+ConfigurableComponent::ConfigurableComponent(
+ const ConfigurableComponent &&other)
+ : properties_(std::move(other.properties_)),
+ logger_(std::move(other.logger_)) {
+
+}
+ConfigurableComponent::~ConfigurableComponent() {
+
+}
+
+/**
+ * Get property using the provided name.
+ * @param name property name.
+ * @param value value passed in by reference
+ * @return result of getting property.
+ */
+bool ConfigurableComponent::getProperty(const std::string name,
+ std::string &value) {
+ std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+ auto &&it = properties_.find(name);
+
+ if (it != properties_.end()) {
+ Property item = it->second;
+ value = item.getValue();
+ logger_->log_info("Processor %s property name %s value %s", name.c_str(),
+ item.getName().c_str(), value.c_str());
+ return true;
+ } else {
+ return false;
+ }
+}
+/**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return result of setting property.
+ */
+bool ConfigurableComponent::setProperty(const std::string name,
+ std::string value) {
+ std::lock_guard<std::mutex> lock(configuration_mutex_);
+ auto &&it = properties_.find(name);
+
+ if (it != properties_.end()) {
+ Property item = it->second;
+ item.setValue(value);
+ properties_[item.getName()] = item;
+ logger_->log_info("Component %s property name %s value %s", name.c_str(),
+ item.getName().c_str(), value.c_str());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+/**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return whether property was set or not
+ */
+bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
+ std::lock_guard<std::mutex> lock(configuration_mutex_);
+ auto it = properties_.find(prop.getName());
+
+ if (it != properties_.end()) {
+ Property item = it->second;
+ item.setValue(value);
+ properties_[item.getName()] = item;
+ logger_->log_info("property name %s value %s", prop.getName().c_str(),
+ item.getName().c_str(), value.c_str());
+ return true;
+ } else {
+ Property newProp(prop);
+ newProp.setValue(value);
+ properties_.insert(
+ std::pair<std::string, Property>(prop.getName(), newProp));
+ return true;
+
+ }
+ return false;
+}
+
+/**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+bool ConfigurableComponent::setSupportedProperties(
+ std::set<Property> properties) {
+ if (!canEdit()) {
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+ properties_.clear();
+ for (auto item : properties) {
+ properties_[item.getName()] = item;
+ }
+
+ return true;
+}
+
+} /* namespace components */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
new file mode 100644
index 0000000..52bde69
--- /dev/null
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ConfigurationFactory.h"
+#include "core/FlowConfiguration.h"
+#include <type_traits>
+#ifdef YAML_SUPPORT
+#include "core/yaml/YamlConfiguration.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+#ifndef YAML_SUPPORT
+ class YamlConfiguration;
+#endif
+
+ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
+ std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,
+ const std::string configuration_class_name, const std::string path,
+ bool fail_safe) {
+
+ std::string class_name_lc = configuration_class_name;
+ std::transform(class_name_lc.begin(), class_name_lc.end(),
+ class_name_lc.begin(), ::tolower);
+ try {
+
+ if (class_name_lc == "flowconfiguration") {
+ // load the base configuration.
+ return std::unique_ptr<core::FlowConfiguration>(
+ new core::FlowConfiguration(repo, flow_file_repo, path));
+
+ } else if (class_name_lc == "yamlconfiguration") {
+ // only load if the class is defined.
+ return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, path));
+
+
+ } else {
+ if (fail_safe) {
+ return std::unique_ptr<core::FlowConfiguration>(
+ new core::FlowConfiguration(repo, flow_file_repo, path));
+ } else {
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+ }
+ }
+ } catch (const std::runtime_error &r) {
+ if (fail_safe) {
+ return std::unique_ptr<core::FlowConfiguration>(
+ new core::FlowConfiguration(repo, flow_file_repo, path));
+ }
+ }
+
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+ }
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
new file mode 100644
index 0000000..ac61568
--- /dev/null
+++ b/libminifi/src/core/Connectable.cpp
@@ -0,0 +1,174 @@
+/*
+ * Connectable.cpp
+ *
+ * Created on: Feb 27, 2017
+ * Author: mparisi
+ */
+
+#include "../../include/core/Connectable.h"
+
+#include <uuid/uuid.h>
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+Connectable::Connectable(std::string name, uuid_t uuid)
+ : CoreComponent(name, uuid),
+ max_concurrent_tasks_(1) {
+
+}
+
+Connectable::Connectable(const Connectable &&other)
+ : CoreComponent(std::move(other)),
+ max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)) {
+ has_work_ = other.has_work_.load();
+ strategy_ = other.strategy_.load();
+}
+
+Connectable::~Connectable() {
+
+}
+
+bool Connectable::setSupportedRelationships(
+ std::set<core::Relationship> relationships) {
+ if (isRunning()) {
+ logger_->log_info(
+ "Can not set processor supported relationship while the process %s is running",
+ name_.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+ relationships_.clear();
+ for (auto item : relationships) {
+ relationships_[item.getName()] = item;
+ logger_->log_info("Processor %s supported relationship name %s",
+ name_.c_str(), item.getName().c_str());
+ }
+
+ return true;
+}
+
+// Whether the relationship is supported
+bool Connectable::isSupportedRelationship(core::Relationship relationship) {
+ const bool requiresLock = isRunning();
+
+ const auto conditionalLock =
+ !requiresLock ?
+ std::unique_lock<std::mutex>() :
+ std::unique_lock<std::mutex>(relationship_mutex_);
+
+ const auto &it = relationships_.find(relationship.getName());
+ if (it != relationships_.end()) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool Connectable::setAutoTerminatedRelationships(
+ std::set<Relationship> relationships) {
+ if (isRunning()) {
+ logger_->log_info(
+ "Can not set processor auto terminated relationship while the process %s is running",
+ name_.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+ auto_terminated_relationships_.clear();
+ for (auto item : relationships) {
+ auto_terminated_relationships_[item.getName()] = item;
+ logger_->log_info("Processor %s auto terminated relationship name %s",
+ name_.c_str(), item.getName().c_str());
+ }
+
+ return true;
+}
+
+// Check whether the relationship is auto terminated
+bool Connectable::isAutoTerminated(core::Relationship relationship) {
+ const bool requiresLock = isRunning();
+
+ const auto conditionalLock =
+ !requiresLock ?
+ std::unique_lock<std::mutex>() :
+ std::unique_lock<std::mutex>(relationship_mutex_);
+
+ const auto &it = auto_terminated_relationships_.find(relationship.getName());
+ if (it != auto_terminated_relationships_.end()) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void Connectable::waitForWork(uint64_t timeoutMs) {
+ has_work_.store(isWorkAvailable());
+
+ if (!has_work_.load()) {
+ std::unique_lock<std::mutex> lock(work_available_mutex_);
+ work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
+ [&] {return has_work_.load();});
+ }
+
+}
+
+void Connectable::notifyWork() {
+ // Do nothing if we are not event-driven
+ if (strategy_ != EVENT_DRIVEN) {
+ return;
+ }
+
+ {
+ has_work_.store(isWorkAvailable());
+
+ if (has_work_.load()) {
+ work_condition_.notify_one();
+ }
+ }
+
+}
+
+std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(
+ std::string relationship) {
+ std::set<std::shared_ptr<Connectable>> empty;
+
+ auto &&it = _outGoingConnections.find(relationship);
+ if (it != _outGoingConnections.end()) {
+ return _outGoingConnections[relationship];
+ } else {
+ return empty;
+ }
+}
+
+std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
+ std::lock_guard<std::mutex> lock(relationship_mutex_);
+
+ if (_incomingConnections.size() == 0)
+ return NULL;
+
+ if (incoming_connections_Iter == _incomingConnections.end())
+ incoming_connections_Iter = _incomingConnections.begin();
+
+ std::shared_ptr<Connectable> ret = *incoming_connections_Iter;
+ incoming_connections_Iter++;
+
+ if (incoming_connections_Iter == _incomingConnections.end())
+ incoming_connections_Iter = _incomingConnections.begin();
+
+ return ret;
+}
+
+} /* namespace components */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
new file mode 100644
index 0000000..39969f6
--- /dev/null
+++ b/libminifi/src/core/Core.cpp
@@ -0,0 +1,51 @@
+/*
+ * Core.cpp
+ *
+ * Created on: Mar 10, 2017
+ * Author: mparisi
+ */
+
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Set UUID
+void CoreComponent::setUUID(uuid_t uuid) {
+ uuid_copy(uuid_, uuid);
+ char uuidStr[37];
+ uuid_unparse_lower(uuid_, uuidStr);
+ uuidStr_ = uuidStr;
+}
+// Get UUID
+bool CoreComponent::getUUID(uuid_t uuid) {
+ if (uuid) {
+ uuid_copy(uuid, uuid_);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+// Get UUID
+unsigned const char *CoreComponent::getUUID() {
+ return uuid_;
+}
+
+// Set Processor Name
+void CoreComponent::setName(const std::string name) {
+ name_ = name;
+
+}
+// Get Process Name
+std::string CoreComponent::getName() {
+ return name_;
+}
+}
+}
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
new file mode 100644
index 0000000..c6472cc
--- /dev/null
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/FlowConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+FlowConfiguration::~FlowConfiguration() {
+
+}
+
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
+ std::string name, uuid_t uuid) {
+ std::shared_ptr<core::Processor> processor = nullptr;
+ if (name
+ == org::apache::nifi::minifi::processors::GenerateFlowFile::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::GenerateFlowFile>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::LogAttribute>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::RealTimeDataCollector>(name,
+ uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::GetFile::ProcessorName) {
+ processor =
+ std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name,
+ uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::PutFile::ProcessorName) {
+ processor =
+ std::make_shared<org::apache::nifi::minifi::processors::PutFile>(name,
+ uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::TailFile::ProcessorName) {
+ processor =
+ std::make_shared<org::apache::nifi::minifi::processors::TailFile>(name,
+ uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::ExecuteProcess>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid);
+ } else {
+ logger_->log_error("No Processor defined for %s", name.c_str());
+ return nullptr;
+ }
+
+ // initialize the processor
+ processor->initialize();
+
+ return processor;
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
+ std::string name, uuid_t uuid) {
+ return std::unique_ptr<core::ProcessGroup>(
+ new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(
+ std::string name, uuid_t uuid) {
+ return std::unique_ptr<core::ProcessGroup>(
+ new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+}
+
+std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(
+ std::string name, uuid_t uuid) {
+ return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
new file mode 100644
index 0000000..baa3ebd
--- /dev/null
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -0,0 +1,312 @@
+/**
+ * @file ProcessGroup.cpp
+ * ProcessGroup class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "core/ProcessGroup.h"
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
+ ProcessGroup *parent)
+ : name_(name),
+ type_(type),
+ parent_process_group_(parent) {
+ if (!uuid)
+ // Generate the global UUID for the flow record
+ uuid_generate(uuid_);
+ else
+ uuid_copy(uuid_, uuid);
+
+ yield_period_msec_ = 0;
+ transmitting_ = false;
+
+ logger_ = logging::Logger::getLogger();
+ logger_->log_info("ProcessGroup %s created", name_.c_str());
+}
+
+ProcessGroup::~ProcessGroup() {
+ for (auto &&connection : connections_) {
+ connection->drain();
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
+ it != child_process_groups_.end(); ++it) {
+ ProcessGroup *processGroup(*it);
+ delete processGroup;
+ }
+
+}
+
+bool ProcessGroup::isRootProcessGroup() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return (type_ == ROOT_PROCESS_GROUP);
+}
+
+void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (processors_.find(processor) == processors_.end()) {
+ // We do not have the same processor in this process group yet
+ processors_.insert(processor);
+ logger_->log_info("Add processor %s into process group %s",
+ processor->getName().c_str(), name_.c_str());
+ }
+}
+
+void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (processors_.find(processor) != processors_.end()) {
+ // We do have the same processor in this process group yet
+ processors_.erase(processor);
+ logger_->log_info("Remove processor %s from process group %s",
+ processor->getName().c_str(), name_.c_str());
+ }
+}
+
+void ProcessGroup::addProcessGroup(ProcessGroup *child) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (child_process_groups_.find(child) == child_process_groups_.end()) {
+ // We do not have the same child process group in this process group yet
+ child_process_groups_.insert(child);
+ logger_->log_info("Add child process group %s into process group %s",
+ child->getName().c_str(), name_.c_str());
+ }
+}
+
+void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (child_process_groups_.find(child) != child_process_groups_.end()) {
+ // We do have the same child process group in this process group yet
+ child_process_groups_.erase(child);
+ logger_->log_info("Remove child process group %s from process group %s",
+ child->getName().c_str(), name_.c_str());
+ }
+}
+
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ try {
+ // Start all the processor node, input and output ports
+ for (auto processor : processors_) {
+ logger_->log_debug("Starting %s", processor->getName().c_str());
+
+ if (!processor->isRunning()
+ && processor->getScheduledState() != DISABLED) {
+ if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+ timeScheduler->schedule(processor);
+ else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+ eventScheduler->schedule(processor);
+ }
+ }
+ // Start processing the group
+ for (auto processGroup : child_process_groups_) {
+ processGroup->startProcessing(timeScheduler, eventScheduler);
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug(
+ "Caught Exception during process group start processing");
+ throw;
+ }
+}
+
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ try {
+ // Stop all the processor node, input and output ports
+ for (std::set<std::shared_ptr<Processor> >::iterator it =
+ processors_.begin(); it != processors_.end(); ++it) {
+ std::shared_ptr<Processor> processor(*it);
+ if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+ timeScheduler->unschedule(processor);
+ else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+ eventScheduler->unschedule(processor);
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
+ it != child_process_groups_.end(); ++it) {
+ ProcessGroup *processGroup(*it);
+ processGroup->stopProcessing(timeScheduler, eventScheduler);
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process group stop processing");
+ throw;
+ }
+}
+
+std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+
+ std::shared_ptr<Processor> ret = NULL;
+ // std::lock_guard<std::mutex> lock(mutex_);
+
+ for (auto processor : processors_) {
+ logger_->log_info("find processor %s", processor->getName().c_str());
+ uuid_t processorUUID;
+
+ if (processor->getUUID(processorUUID)) {
+
+ char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
+ uuid_unparse_lower(processorUUID, uuid_str);
+ std::string processorUUIDstr = uuid_str;
+ uuid_unparse_lower(uuid, uuid_str);
+ std::string uuidStr = uuid_str;
+ if (processorUUIDstr == uuidStr) {
+ return processor;
+ }
+ }
+
+ }
+ for (auto processGroup : child_process_groups_) {
+
+ logger_->log_info("find processor child %s",
+ processGroup->getName().c_str());
+ std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
+ if (processor)
+ return processor;
+ }
+
+ return ret;
+}
+
+std::shared_ptr<Processor> ProcessGroup::findProcessor(
+ const std::string &processorName) {
+ std::shared_ptr<Processor> ret = NULL;
+
+ for (auto processor : processors_) {
+ logger_->log_debug("Current processor is %s", processor->getName().c_str());
+ if (processor->getName() == processorName)
+ return processor;
+ }
+
+ for (auto processGroup : child_process_groups_) {
+ std::shared_ptr<Processor> processor = processGroup->findProcessor(
+ processorName);
+ if (processor)
+ return processor;
+ }
+
+ return ret;
+}
+
+void ProcessGroup::updatePropertyValue(std::string processorName,
+ std::string propertyName,
+ std::string propertyValue) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ for (auto processor : processors_) {
+ if (processor->getName() == processorName) {
+ processor->setProperty(propertyName, propertyValue);
+ }
+ }
+
+ for (auto processGroup : child_process_groups_) {
+ processGroup->updatePropertyValue(processorName, propertyName,
+ propertyValue);
+ }
+
+ return;
+}
+
+void ProcessGroup::getConnections(
+ std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
+ for (auto connection : connections_) {
+ connectionMap[connection->getUUIDStr()] = connection;
+ }
+
+ for (auto processGroup : child_process_groups_) {
+ processGroup->getConnections(connectionMap);
+ }
+}
+
+void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (connections_.find(connection) == connections_.end()) {
+ // We do not have the same connection in this process group yet
+ connections_.insert(connection);
+ logger_->log_info("Add connection %s into process group %s",
+ connection->getName().c_str(), name_.c_str());
+ uuid_t sourceUUID;
+ std::shared_ptr<Processor> source = NULL;
+ connection->getSourceUUID(sourceUUID);
+ source = this->findProcessor(sourceUUID);
+ if (source)
+ source->addConnection(connection);
+ std::shared_ptr<Processor> destination = NULL;
+ uuid_t destinationUUID;
+ connection->getDestinationUUID(destinationUUID);
+ destination = this->findProcessor(destinationUUID);
+ if (destination && destination != source)
+ destination->addConnection(connection);
+ }
+}
+
+void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (connections_.find(connection) != connections_.end()) {
+ // We do not have the same connection in this process group yet
+ connections_.erase(connection);
+ logger_->log_info("Remove connection %s into process group %s",
+ connection->getName().c_str(), name_.c_str());
+ uuid_t sourceUUID;
+ std::shared_ptr<Processor> source = NULL;
+ connection->getSourceUUID(sourceUUID);
+ source = this->findProcessor(sourceUUID);
+ if (source)
+ source->removeConnection(connection);
+ std::shared_ptr<Processor> destination = NULL;
+ uuid_t destinationUUID;
+ connection->getDestinationUUID(destinationUUID);
+ destination = this->findProcessor(destinationUUID);
+ if (destination && destination != source)
+ destination->removeConnection(connection);
+ }
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
new file mode 100644
index 0000000..e6fa7c4
--- /dev/null
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -0,0 +1,941 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::shared_ptr<core::FlowFile> ProcessSession::create() {
+ std::map<std::string, std::string> empty;
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+ empty);
+
+ _addedFlowFiles[record->getUUIDStr()] = record;
+ logger_->log_debug("Create FlowFile with UUID %s",
+ record->getUUIDStr().c_str());
+ std::string details = process_context_->getProcessorNode().getName()
+ + " creates flow record " + record->getUUIDStr();
+ provenance_report_->create(record, details);
+
+ return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &parent) {
+ std::map<std::string, std::string> empty;
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+ empty);
+
+ if (record) {
+ _addedFlowFiles[record->getUUIDStr()] = record;
+ logger_->log_debug("Create FlowFile with UUID %s",
+ record->getUUIDStr().c_str());
+ }
+
+ if (record) {
+ // Copy attributes
+ std::map<std::string, std::string> parentAttributes =
+ parent->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
+ || it->first == FlowAttributeKey(DISCARD_REASON)
+ || it->first == FlowAttributeKey(UUID))
+ // Do not copy special attributes from parent
+ continue;
+ record->setAttribute(it->first, it->second);
+ }
+ record->setLineageStartDate(parent->getlineageStartDate());
+ record->setLineageIdentifiers(parent->getlineageIdentifiers());
+ parent->getlineageIdentifiers().insert(parent->getUUIDStr());
+
+ }
+ return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::clone(
+ std::shared_ptr<core::FlowFile> &parent) {
+ std::shared_ptr<core::FlowFile> record = this->create(parent);
+ if (record) {
+ // Copy Resource Claim
+ std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+ record->setResourceClaim(parent_claim);
+ if (parent_claim != nullptr) {
+ record->setOffset(parent->getOffset());
+ record->setSize(parent->getSize());
+ record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+ ;
+ }
+ provenance_report_->clone(parent, record);
+ }
+ return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
+ std::shared_ptr<core::FlowFile> &parent) {
+ std::map<std::string, std::string> empty;
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
+ empty);
+
+ if (record) {
+ this->_clonedFlowFiles[record->getUUIDStr()] = record;
+ logger_->log_debug("Clone FlowFile with UUID %s during transfer",
+ record->getUUIDStr().c_str());
+ // Copy attributes
+ std::map<std::string, std::string> parentAttributes =
+ parent->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
+ || it->first == FlowAttributeKey(DISCARD_REASON)
+ || it->first == FlowAttributeKey(UUID))
+ // Do not copy special attributes from parent
+ continue;
+ record->setAttribute(it->first, it->second);
+ }
+ record->setLineageStartDate(parent->getlineageStartDate());
+
+ record->setLineageIdentifiers(parent->getlineageIdentifiers());
+ record->getlineageIdentifiers().insert(parent->getUUIDStr());
+
+ // Copy Resource Claim
+ std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+ record->setResourceClaim(parent_claim);
+ if (parent_claim != nullptr) {
+ record->setOffset(parent->getOffset());
+ record->setSize(parent->getSize());
+ record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+ ;
+ }
+ provenance_report_->clone(parent, record);
+ }
+
+ return record;
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::clone(
+ std::shared_ptr<core::FlowFile> &parent, long offset, long size) {
+ std::shared_ptr<core::FlowFile> record = this->create(parent);
+ if (record) {
+
+ if (parent->getResourceClaim()) {
+ if ((offset + size) > (long) parent->getSize()) {
+ // Set offset and size
+ logger_->log_error("clone offset %d and size %d exceed parent size %d",
+ offset, size, parent->getSize());
+ // Remove the Add FlowFile for the session
+ std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it =
+ this->_addedFlowFiles.find(record->getUUIDStr());
+ if (it != this->_addedFlowFiles.end())
+ this->_addedFlowFiles.erase(record->getUUIDStr());
+ return nullptr;
+ }
+ record->setOffset(parent->getOffset() + parent->getOffset());
+ record->setSize(size);
+ // Copy Resource Claim
+ std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
+ record->setResourceClaim(parent_claim);
+ if (parent_claim != nullptr) {
+
+ record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
+ }
+ }
+ provenance_report_->clone(parent, record);
+ }
+ return record;
+}
+
+void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
+ flow->setDeleted(true);
+ _deletedFlowFiles[flow->getUUIDStr()] = flow;
+ std::string reason = process_context_->getProcessorNode().getName()
+ + " drop flow record " + flow->getUUIDStr();
+ provenance_report_->drop(flow, reason);
+}
+
+void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
+ flow->setDeleted(true);
+ _deletedFlowFiles[flow->getUUIDStr()] = flow;
+ std::string reason = process_context_->getProcessorNode().getName()
+ + " drop flow record " + flow->getUUIDStr();
+ provenance_report_->drop(flow, reason);
+}
+
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow,
+ std::string key, std::string value) {
+ flow->setAttribute(key, value);
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
+ + value;
+ provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow,
+ std::string key) {
+ flow->removeAttribute(key);
+ std::string details = process_context_->getProcessorNode().getName()
+ + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+ provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow,
+ std::string key, std::string value) {
+ flow->setAttribute(key, value);
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
+ + value;
+ provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow,
+ std::string key) {
+ flow->removeAttribute(key);
+ std::string details = process_context_->getProcessorNode().getName()
+ + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+ provenance_report_->modifyAttributes(flow, details);
+}
+
+void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
+ flow->setPenaltyExpiration(
+ getTimeMillis()
+ + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+}
+
+void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
+ flow->setPenaltyExpiration(
+ getTimeMillis()
+ + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+}
+
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow,
+ Relationship relationship) {
+ _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow,
+ Relationship relationship) {
+ _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
+ OutputStreamCallback *callback) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
+ DEFAULT_CONTENT_DIRECTORY);
+
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ if (fs.is_open()) {
+ // Call the callback to write the content
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0) {
+ flow->setSize(fs.tellp());
+ flow->setOffset(0);
+ std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+ if (flow_claim != nullptr) {
+ // Remove the old claim
+ flow_claim->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ flow->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+ /*
+ logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception during process session write");
+ throw;
+ }
+}
+
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
+ OutputStreamCallback *callback) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ if (fs.is_open()) {
+ // Call the callback to write the content
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0) {
+ flow->setSize(fs.tellp());
+ flow->setOffset(0);
+ std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+ if (flow_claim != nullptr) {
+ // Remove the old claim
+ flow_claim->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ flow->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+ /*
+ logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception during process session write");
+ throw;
+ }
+}
+
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
+ OutputStreamCallback *callback) {
+ std::shared_ptr<ResourceClaim> claim = nullptr;
+
+ if (flow->getResourceClaim() == nullptr) {
+ // No existed claim for append, we need to create new claim
+ return write(flow, callback);
+ }
+
+ claim = flow->getResourceClaim();
+
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::app);
+ if (fs.is_open()) {
+ // Call the callback to write the content
+ std::streampos oldPos = fs.tellp();
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0) {
+ uint64_t appendSize = fs.tellp() - oldPos;
+ flow->setSize(flow->getSize() + appendSize);
+ /*
+ logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+ flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session append");
+ throw;
+ }
+}
+
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
+ OutputStreamCallback *callback) {
+ std::shared_ptr<ResourceClaim> claim = nullptr;
+
+ if (flow->getResourceClaim() == nullptr) {
+ // No existed claim for append, we need to create new claim
+ return write(flow, callback);
+ }
+
+ claim = flow->getResourceClaim();
+
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::app);
+ if (fs.is_open()) {
+ // Call the callback to write the content
+ std::streampos oldPos = fs.tellp();
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0) {
+ uint64_t appendSize = fs.tellp() - oldPos;
+ flow->setSize(flow->getSize() + appendSize);
+ /*
+ logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+ flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session append");
+ throw;
+ }
+}
+
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
+ InputStreamCallback *callback) {
+ try {
+ std::shared_ptr<ResourceClaim> claim = nullptr;
+
+ if (flow->getResourceClaim() == nullptr) {
+ // No existed claim for read, we throw exception
+ throw Exception(FILE_OPERATION_EXCEPTION,
+ "No Content Claim existed for read");
+ }
+
+ claim = flow->getResourceClaim();
+ std::ifstream fs;
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::in | std::fstream::binary);
+ if (fs.is_open()) {
+ fs.seekg(flow->getOffset(), fs.beg);
+
+ if (fs.good()) {
+ callback->process(&fs);
+ /*
+ logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session read");
+ throw;
+ }
+}
+
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
+ InputStreamCallback *callback) {
+ try {
+ std::shared_ptr<ResourceClaim> claim = nullptr;
+
+ if (flow->getResourceClaim() == nullptr) {
+ // No existed claim for read, we throw exception
+ throw Exception(FILE_OPERATION_EXCEPTION,
+ "No Content Claim existed for read");
+ }
+
+ claim = flow->getResourceClaim();
+ std::ifstream fs;
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::in | std::fstream::binary);
+ if (fs.is_open()) {
+ fs.seekg(flow->getOffset(), fs.beg);
+
+ if (fs.good()) {
+ callback->process(&fs);
+ /*
+ logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session read");
+ throw;
+ }
+}
+
+void ProcessSession::import(std::string source,
+ std::shared_ptr<core::FlowFile> &flow,
+ bool keepSource, uint64_t offset) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+ char *buf = NULL;
+ int size = 4096;
+ buf = new char[size];
+
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ std::ifstream input;
+ input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+ if (fs.is_open() && input.is_open()) {
+ // Open the source file and stream to the flow file
+ input.seekg(offset, fs.beg);
+ while (input.good()) {
+ input.read(buf, size);
+ if (input)
+ fs.write(buf, size);
+ else
+ fs.write(buf, input.gcount());
+ }
+
+ if (fs.good() && fs.tellp() >= 0) {
+ flow->setSize(fs.tellp());
+ flow->setOffset(0);
+ if (flow->getResourceClaim() != nullptr) {
+ // Remove the old claim
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ flow->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+
+ logger_->log_debug(
+ "Import offset %d length %d into content %s for FlowFile UUID %s",
+ flow->getOffset(), flow->getSize(),
+ flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
+
+ fs.close();
+ input.close();
+ if (!keepSource)
+ std::remove(source.c_str());
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ input.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+
+ delete[] buf;
+ } catch (std::exception &exception) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception %s", exception.what());
+ delete[] buf;
+ throw;
+ } catch (...) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception during process session write");
+ delete[] buf;
+ throw;
+ }
+}
+
+void ProcessSession::import(std::string source,
+ std::shared_ptr<core::FlowFile> &&flow,
+ bool keepSource, uint64_t offset) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+
+ char *buf = NULL;
+ int size = 4096;
+ buf = new char[size];
+
+ try {
+ std::ofstream fs;
+ uint64_t startTime = getTimeMillis();
+ fs.open(claim->getContentFullPath().c_str(),
+ std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ std::ifstream input;
+ input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+ if (fs.is_open() && input.is_open()) {
+ // Open the source file and stream to the flow file
+ input.seekg(offset, fs.beg);
+ while (input.good()) {
+ input.read(buf, size);
+ if (input)
+ fs.write(buf, size);
+ else
+ fs.write(buf, input.gcount());
+ }
+
+ if (fs.good() && fs.tellp() >= 0) {
+ flow->setSize(fs.tellp());
+ flow->setOffset(0);
+ if (flow->getResourceClaim() != nullptr) {
+ // Remove the old claim
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ flow->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+
+ logger_->log_debug(
+ "Import offset %d length %d into content %s for FlowFile UUID %s",
+ flow->getOffset(), flow->getSize(),
+ flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
+
+ fs.close();
+ input.close();
+ if (!keepSource)
+ std::remove(source.c_str());
+ std::string details = process_context_->getProcessorNode().getName()
+ + " modify flow record content " + flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
+ } else {
+ fs.close();
+ input.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+ } else {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+
+ delete[] buf;
+ } catch (std::exception &exception) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception %s", exception.what());
+ delete[] buf;
+ throw;
+ } catch (...) {
+ if (flow && flow->getResourceClaim() == claim) {
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception during process session write");
+ delete[] buf;
+ throw;
+ }
+}
+
+void ProcessSession::commit() {
+
+ try {
+ // First we clone the flow record based on the transfered relationship for updated flow record
+ for (auto && it : _updatedFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ if (record->isDeleted())
+ continue;
+ std::map<std::string, Relationship>::iterator itRelationship = this
+ ->_transferRelationship.find(record->getUUIDStr());
+ if (itRelationship != _transferRelationship.end()) {
+ Relationship relationship = itRelationship->second;
+ // Find the relationship, we need to find the connections for that relationship
+ std::set<std::shared_ptr<Connectable>> connections = process_context_
+ ->getProcessorNode().getOutGoingConnections(relationship.getName());
+ if (connections.empty()) {
+ // No connection
+ if (!process_context_->getProcessorNode().isAutoTerminated(
+ relationship)) {
+ // Not autoterminate, we should have the connect
+ std::string message =
+ "Connect empty for non auto terminated relationship"
+ + relationship.getName();
+ throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+ } else {
+ // Autoterminated
+ remove(record);
+ }
+ } else {
+ // We connections, clone the flow and assign the connection accordingly
+ for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
+ connections.begin(); itConnection != connections.end();
+ ++itConnection) {
+ std::shared_ptr<Connectable> connection = *itConnection;
+ if (itConnection == connections.begin()) {
+ // First connection which the flow need be routed to
+ record->setConnection(connection);
+ } else {
+ // Clone the flow file and route to the connection
+ std::shared_ptr<core::FlowFile> cloneRecord;
+ cloneRecord = this->cloneDuringTransfer(record);
+ if (cloneRecord)
+ cloneRecord->setConnection(connection);
+ else
+ throw Exception(PROCESS_SESSION_EXCEPTION,
+ "Can not clone the flow for transfer");
+ }
+ }
+ }
+ } else {
+ // Can not find relationship for the flow
+ throw Exception(PROCESS_SESSION_EXCEPTION,
+ "Can not find the transfer relationship for the flow");
+ }
+ }
+
+ // Do the samething for added flow file
+ for (const auto it : _addedFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ if (record->isDeleted())
+ continue;
+ std::map<std::string, Relationship>::iterator itRelationship = this
+ ->_transferRelationship.find(record->getUUIDStr());
+ if (itRelationship != _transferRelationship.end()) {
+ Relationship relationship = itRelationship->second;
+ // Find the relationship, we need to find the connections for that relationship
+ std::set<std::shared_ptr<Connectable>> connections = process_context_
+ ->getProcessorNode().getOutGoingConnections(relationship.getName());
+ if (connections.empty()) {
+ // No connection
+ if (!process_context_->getProcessorNode().isAutoTerminated(
+ relationship)) {
+ // Not autoterminate, we should have the connect
+ std::string message =
+ "Connect empty for non auto terminated relationship "
+ + relationship.getName();
+ throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+ } else {
+ // Autoterminated
+ remove(record);
+ }
+ } else {
+ // We connections, clone the flow and assign the connection accordingly
+ for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
+ connections.begin(); itConnection != connections.end();
+ ++itConnection) {
+ std::shared_ptr<Connectable> connection(*itConnection);
+ if (itConnection == connections.begin()) {
+ // First connection which the flow need be routed to
+ record->setConnection(connection);
+ } else {
+ // Clone the flow file and route to the connection
+ std::shared_ptr<core::FlowFile> cloneRecord;
+ cloneRecord = this->cloneDuringTransfer(record);
+ if (cloneRecord)
+ cloneRecord->setConnection(connection);
+ else
+ throw Exception(PROCESS_SESSION_EXCEPTION,
+ "Can not clone the flow for transfer");
+ }
+ }
+ }
+ } else {
+ // Can not find relationship for the flow
+ throw Exception(PROCESS_SESSION_EXCEPTION,
+ "Can not find the transfer relationship for the flow");
+ }
+ }
+
+ std::shared_ptr<Connection> connection = nullptr;
+ // Complete process the added and update flow files for the session, send the flow file to its queue
+ for (const auto &it : _updatedFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ if (record->isDeleted()) {
+ continue;
+ }
+
+ connection = std::static_pointer_cast<Connection>(
+ record->getConnection());
+ if ((connection) != nullptr)
+ connection->put(record);
+ }
+ for (const auto &it : _addedFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ if (record->isDeleted()) {
+ continue;
+ }
+ connection = std::static_pointer_cast<Connection>(
+ record->getConnection());
+ if ((connection) != nullptr)
+ connection->put(record);
+ }
+ // Process the clone flow files
+ for (const auto &it : _clonedFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ if (record->isDeleted()) {
+ continue;
+ }
+ connection = std::static_pointer_cast<Connection>(
+ record->getConnection());
+ if ((connection) != nullptr)
+ connection->put(record);
+ }
+
+ // All done
+ _updatedFlowFiles.clear();
+ _addedFlowFiles.clear();
+ _clonedFlowFiles.clear();
+ _deletedFlowFiles.clear();
+ _originalFlowFiles.clear();
+ // persistent the provenance report
+ this->provenance_report_->commit();
+ logger_->log_trace("ProcessSession committed for %s",
+ process_context_->getProcessorNode().getName().c_str());
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session commit");
+ throw;
+ }
+}
+
+void ProcessSession::rollback() {
+ try {
+ std::shared_ptr<Connection> connection = nullptr;
+ // Requeue the snapshot of the flowfile back
+ for (const auto &it : _originalFlowFiles) {
+ std::shared_ptr<core::FlowFile> record = it.second;
+ connection = std::static_pointer_cast<Connection>(
+ record->getOriginalConnection());
+ if ((connection) != nullptr) {
+ std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<
+ FlowFileRecord>(record);
+ flowf->setSnapShot(false);
+ connection->put(record);
+ }
+
+ }
+ _originalFlowFiles.clear();
+
+ _clonedFlowFiles.clear();
+ _addedFlowFiles.clear();
+ _updatedFlowFiles.clear();
+ _deletedFlowFiles.clear();
+ logger_->log_trace("ProcessSession rollback for %s",
+ process_context_->getProcessorNode().getName().c_str());
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception during process session roll back");
+ throw;
+ }
+}
+
+std::shared_ptr<core::FlowFile> ProcessSession::get() {
+ std::shared_ptr<Connectable> first = process_context_->getProcessorNode()
+ .getNextIncomingConnection();
+
+ if (first == NULL)
+ return NULL;
+
+ std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(
+ first);
+
+ do {
+ std::set<std::shared_ptr<core::FlowFile> > expired;
+ std::shared_ptr<core::FlowFile> ret = current->poll(expired);
+ if (expired.size() > 0) {
+ // Remove expired flow record
+ for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired
+ .begin(); it != expired.end(); ++it) {
+ std::shared_ptr<core::FlowFile> record = *it;
+ std::string details = process_context_->getProcessorNode().getName()
+ + " expire flow record " + record->getUUIDStr();
+ provenance_report_->expire(record, details);
+ }
+ }
+ if (ret) {
+ // add the flow record to the current process session update map
+ ret->setDeleted(false);
+ _updatedFlowFiles[ret->getUUIDStr()] = ret;
+ std::map<std::string, std::string> empty;
+ std::shared_ptr<core::FlowFile> snapshot =
+ std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),empty);
+ logger_->log_debug("Create Snapshot FlowFile with UUID %s",
+ snapshot->getUUIDStr().c_str());
+ snapshot = ret;
+// snapshot->duplicate(ret);
+ // save a snapshot
+ _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+ return ret;
+ }
+ current = std::static_pointer_cast<Connection>(
+ process_context_->getProcessorNode().getNextIncomingConnection());
+ } while (current != NULL && current != first);
+
+ return NULL;
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
new file mode 100644
index 0000000..445ca58
--- /dev/null
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -0,0 +1,42 @@
+/**
+ * @file ProcessSessionFactory.cpp
+ * ProcessSessionFactory class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ProcessSessionFactory.h"
+
+#include <memory>
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession()
+{
+ return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
+}
+
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
new file mode 100644
index 0000000..ba52c28
--- /dev/null
+++ b/libminifi/src/core/Processor.cpp
@@ -0,0 +1,272 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <memory>
+#include <functional>
+
+#include "core/Processor.h"
+
+#include "Connection.h"
+#include "core/Connectable.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+Processor::Processor(std::string name, uuid_t uuid)
+ : Connectable(name, uuid),
+ ConfigurableComponent(logging::Logger::getLogger()) {
+
+ has_work_.store(false);
+ // Setup the default values
+ state_ = DISABLED;
+ strategy_ = TIMER_DRIVEN;
+ loss_tolerant_ = false;
+ _triggerWhenEmpty = false;
+ scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
+ run_durantion_nano_ = 0;
+ yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+ _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+ max_concurrent_tasks_ = 1;
+ active_tasks_ = 0;
+ yield_expiration_ = 0;
+ incoming_connections_Iter = this->_incomingConnections.begin();
+ logger_ = logging::Logger::getLogger();
+ logger_->log_info("Processor %s created UUID %s", name_.c_str(),
+ uuidStr_.c_str());
+}
+
+bool Processor::isRunning() {
+ return (state_ == RUNNING && active_tasks_ > 0);
+}
+
+void Processor::setScheduledState(ScheduledState state) {
+ state_ = state;
+}
+
+bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
+
+ bool ret = false;
+
+ if (isRunning()) {
+ logger_->log_info("Can not add connection while the process %s is running",
+ name_.c_str());
+ return false;
+ }
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ uuid_t srcUUID;
+ uuid_t destUUID;
+
+ connection->getSourceUUID(srcUUID);
+ connection->getDestinationUUID(destUUID);
+ char uuid_str[37];
+
+ uuid_unparse_lower(uuid_, uuid_str);
+ std::string my_uuid = uuid_str;
+ uuid_unparse_lower(destUUID, uuid_str);
+ std::string destination_uuid = uuid_str;
+ if (my_uuid == destination_uuid) {
+ // Connection is destination to the current processor
+ if (_incomingConnections.find(connection) == _incomingConnections.end()) {
+ _incomingConnections.insert(connection);
+ connection->setDestination(shared_from_this());
+ logger_->log_info(
+ "Add connection %s into Processor %s incoming connection",
+ connection->getName().c_str(), name_.c_str());
+ incoming_connections_Iter = this->_incomingConnections.begin();
+ ret = true;
+ }
+ }
+ uuid_unparse_lower(srcUUID, uuid_str);
+ std::string source_uuid = uuid_str;
+ if (my_uuid == source_uuid) {
+ std::string relationship = connection->getRelationship().getName();
+ // Connection is source from the current processor
+ auto &&it = _outGoingConnections.find(relationship);
+ if (it != _outGoingConnections.end()) {
+ // We already has connection for this relationship
+ std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
+ if (existedConnection.find(connection) == existedConnection.end()) {
+ // We do not have the same connection for this relationship yet
+ existedConnection.insert(connection);
+ connection->setSource(shared_from_this());
+ _outGoingConnections[relationship] = existedConnection;
+ logger_->log_info(
+ "Add connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ ret = true;
+ }
+ } else {
+
+ // We do not have any outgoing connection for this relationship yet
+ std::set<std::shared_ptr<Connectable>> newConnection;
+ newConnection.insert(connection);
+ connection->setSource(shared_from_this());
+ _outGoingConnections[relationship] = newConnection;
+ logger_->log_info(
+ "Add connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ ret = true;
+ }
+ }
+
+ return ret;
+}
+
+void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
+ if (isRunning()) {
+ logger_->log_info(
+ "Can not remove connection while the process %s is running",
+ name_.c_str());
+ return;
+ }
+
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ uuid_t srcUUID;
+ uuid_t destUUID;
+
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+
+ connection->getSourceUUID(srcUUID);
+ connection->getDestinationUUID(destUUID);
+
+ if (uuid_compare(uuid_, destUUID) == 0) {
+ // Connection is destination to the current processor
+ if (_incomingConnections.find(connection) != _incomingConnections.end()) {
+ _incomingConnections.erase(connection);
+ connection->setDestination(NULL);
+ logger_->log_info(
+ "Remove connection %s into Processor %s incoming connection",
+ connection->getName().c_str(), name_.c_str());
+ incoming_connections_Iter = this->_incomingConnections.begin();
+ }
+ }
+
+ if (uuid_compare(uuid_, srcUUID) == 0) {
+ std::string relationship = connection->getRelationship().getName();
+ // Connection is source from the current processor
+ auto &&it = _outGoingConnections.find(relationship);
+ if (it == _outGoingConnections.end()) {
+ return;
+ } else {
+ if (_outGoingConnections[relationship].find(connection)
+ != _outGoingConnections[relationship].end()) {
+ _outGoingConnections[relationship].erase(connection);
+ connection->setSource(NULL);
+ logger_->log_info(
+ "Remove connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ }
+ }
+ }
+}
+
+
+
+bool Processor::flowFilesQueued() {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ if (_incomingConnections.size() == 0)
+ return false;
+
+ for (auto &&conn : _incomingConnections) {
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ if (connection->getQueueSize() > 0)
+ return true;
+ }
+
+ return false;
+}
+
+bool Processor::flowFilesOutGoingFull() {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ for (auto &&connection : _outGoingConnections) {
+ // We already has connection for this relationship
+ std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
+ for (const auto conn : existedConnection) {
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ if (connection->isFull())
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void Processor::onTrigger(ProcessContext *context,
+ ProcessSessionFactory *sessionFactory) {
+ auto session = sessionFactory->createSession();
+
+ try {
+ // Call the virtual trigger function
+ onTrigger(context, session.get());
+ session->commit();
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ session->rollback();
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught Exception Processor::onTrigger");
+ session->rollback();
+ throw;
+ }
+}
+
+bool Processor::isWorkAvailable() {
+ // We have work if any incoming connection has work
+ bool hasWork = false;
+
+ try {
+ for (const auto &conn : _incomingConnections) {
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ if (connection->getQueueSize() > 0) {
+ hasWork = true;
+ break;
+ }
+ }
+ } catch (...) {
+ logger_->log_error(
+ "Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
+ }
+
+ return hasWork;
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
new file mode 100644
index 0000000..44491d3
--- /dev/null
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ProcessorNode.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
+ : processor_(processor),
+ Connectable(processor->getName(),0),
+ ConfigurableComponent(logging::Logger::getLogger()) {
+
+ uuid_t copy;
+ processor->getUUID(copy);
+ setUUID( copy );
+
+
+}
+
+ProcessorNode::ProcessorNode(const ProcessorNode &other)
+ : processor_(other.processor_),
+ Connectable(other.getName(), 0),
+ ConfigurableComponent(logging::Logger::getLogger()) {
+
+ uuid_t copy;
+ processor_->getUUID(copy);
+ setUUID( copy );
+
+}
+
+ProcessorNode::~ProcessorNode() {
+
+}
+
+bool ProcessorNode::isWorkAvailable() {
+ return processor_->isWorkAvailable();
+}
+
+bool ProcessorNode::isRunning() {
+ return processor_->isRunning();
+}
+
+} /* namespace processor */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
new file mode 100644
index 0000000..287b7ec
--- /dev/null
+++ b/libminifi/src/core/Property.cpp
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {namespace minifi {
+namespace core {
+
+// Get Name for the property
+std::string Property::getName() const {
+ return name_;
+}
+// Get Description for the property
+std::string Property::getDescription() {
+ return description_;
+}
+// Get value for the property
+std::string Property::getValue() const {
+ return value_;
+}
+// Set value for the property
+void Property::setValue(std::string value) {
+ value_ = value;
+}
+// Compare
+bool Property::operator <(const Property & right) const {
+ return name_ < right.name_;
+}
+
+const Property &Property::operator=(const Property &other) {
+ name_ = other.name_;
+ value_ = other.value_;
+ return *this;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Record.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp
new file mode 100644
index 0000000..dbf0102
--- /dev/null
+++ b/libminifi/src/core/Record.cpp
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 <copyright holder> <email>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+FlowFile::FlowFile()
+ : size_(0),
+ id_(0),
+ stored(false),
+ offset_(0),
+ last_queue_date_(0),
+ penaltyExpiration_ms_(0),
+ claim_(nullptr),
+ marked_delete_(false),
+ connection_(nullptr),
+ original_connection_() {
+ entry_date_ = getTimeMillis();
+ lineage_start_date_ = entry_date_;
+
+ char uuidStr[37];
+
+ // Generate the global UUID for the flow record
+ uuid_generate(uuid_);
+
+ uuid_unparse_lower(uuid_, uuidStr);
+ uuid_str_ = uuidStr;
+
+ logger_ = logging::Logger::getLogger();
+
+}
+
+FlowFile::~FlowFile() {
+
+}
+
+FlowFile& FlowFile::operator=(const FlowFile& other) {
+
+ uuid_copy(uuid_, other.uuid_);
+ stored = other.stored;
+ marked_delete_ = other.marked_delete_;
+ entry_date_ = other.entry_date_;
+ lineage_start_date_ = other.lineage_start_date_;
+ lineage_Identifiers_ = other.lineage_Identifiers_;
+ last_queue_date_ = other.last_queue_date_;
+ size_ = other.size_;
+ penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
+ attributes_ = other.attributes_;
+ claim_ = other.claim_;
+ if (claim_ != nullptr)
+ this->claim_->increaseFlowFileRecordOwnedCount();
+ uuid_str_ = other.uuid_str_;
+ connection_ = other.connection_;
+ original_connection_ = other.original_connection_;
+
+ return *this;
+}
+
+/**
+ * Returns whether or not this flow file record
+ * is marked as deleted.
+ * @return marked deleted
+ */
+bool FlowFile::isDeleted() {
+ return marked_delete_;
+}
+
+/**
+ * Sets whether to mark this flow file record
+ * as deleted
+ * @param deleted deleted flag
+ */
+void FlowFile::setDeleted(const bool deleted) {
+ marked_delete_ = deleted;
+}
+
+std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
+ return claim_;
+}
+
+void FlowFile::clearResourceClaim() {
+ claim_ = nullptr;
+}
+void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
+ claim_ = claim;
+}
+
+// ! Get Entry Date
+uint64_t FlowFile::getEntryDate() {
+ return entry_date_;
+}
+uint64_t FlowFile::getEventTime() {
+ return event_time_;
+}
+// ! Get Lineage Start Date
+uint64_t FlowFile::getlineageStartDate() {
+ return lineage_start_date_;
+}
+
+std::set<std::string> &FlowFile::getlineageIdentifiers() {
+ return lineage_Identifiers_;
+}
+
+bool FlowFile::getAttribute(std::string key, std::string &value) {
+ auto it = attributes_.find(key);
+ if (it != attributes_.end()) {
+ value = it->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+// Get Size
+uint64_t FlowFile::getSize() {
+ return size_;
+}
+// ! Get Offset
+uint64_t FlowFile::getOffset() {
+ return offset_;
+}
+
+bool FlowFile::removeAttribute(const std::string key) {
+ auto it = attributes_.find(key);
+ if (it != attributes_.end()) {
+ attributes_.erase(key);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool FlowFile::updateAttribute(const std::string key, const std::string value) {
+ auto it = attributes_.find(key);
+ if (it != attributes_.end()) {
+ attributes_[key] = value;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+ auto it = attributes_.find(key);
+ if (it != attributes_.end()) {
+ // attribute already there in the map
+ return false;
+ } else {
+ attributes_[key] = value;
+ return true;
+ }
+}
+
+void FlowFile::setLineageStartDate(const uint64_t date) {
+ lineage_start_date_ = date;
+}
+
+/**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setOriginalConnection(
+ std::shared_ptr<core::Connectable> &connection) {
+ original_connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
+ connection_ = connection;
+}
+
+/**
+ * Sets the connection with a shared pointer.
+ * @param connection shared connection.
+ */
+void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
+ connection_ = connection;
+}
+
+/**
+ * Returns the connection referenced by this record.
+ * @return shared connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getConnection() {
+ return connection_;
+}
+
+/**
+ * Returns the original connection referenced by this record.
+ * @return shared original connection pointer.
+ */
+std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() {
+ return original_connection_;
+}
+
+}
+}
+}
+}
+}