You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:42 UTC
[3/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to
better abstract deser.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index e6d561a..ac092ea 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -18,6 +18,7 @@
#include "core/repository/FlowFileRepository.h"
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "FlowFileRecord.h"
@@ -36,24 +37,36 @@ void FlowFileRepository::run() {
uint64_t curTime = getTimeMillis();
uint64_t size = repoSize();
if (size >= purgeThreshold) {
- std::vector<std::string> purgeList;
+ std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
+ std::vector<std::pair<std::string, uint64_t>> keyRemovalList;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
std::string key = it->key().ToString();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
- if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
- purgeList.push_back(key);
+ if ((curTime - eventRead->getEventTime()) > max_partition_millis_) {
+ purgeList.push_back(eventRead);
+ keyRemovalList.push_back(std::make_pair(key, it->value().size()));
+ }
} else {
logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str());
- purgeList.push_back(key);
+ keyRemovalList.push_back(std::make_pair(key, it->value().size()));
}
}
delete it;
- for (auto eventId : purgeList) {
- logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
- Delete(eventId);
+ for (auto eventId : keyRemovalList) {
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+ if (Delete(eventId.first)) {
+ repo_size_ -= eventId.second;
+ }
+ }
+
+ for (const auto &ffr : purgeList) {
+ auto claim = ffr->getResourceClaim();
+ if (claim != nullptr) {
+ content_repo_->remove(claim);
+ }
}
}
if (size > max_partition_bytes_)
@@ -61,22 +74,23 @@ void FlowFileRepository::run() {
else
repo_full_ = false;
}
- return;
}
-void FlowFileRepository::loadComponent() {
- std::vector<std::string> purgeList;
+void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+ content_repo_ = content_repo;
+ std::vector<std::pair<std::string, uint64_t>> purgeList;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
std::string key = it->key().ToString();
+ repo_size_ += it->value().size();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
auto search = connectionMap.find(eventRead->getConnectionUuid());
if (search != connectionMap.end()) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
- std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
+ std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
// set store to repo to true so that we do need to persistent again in enqueue
record->setStoredToRepository(true);
search->second->put(record);
@@ -84,19 +98,19 @@ void FlowFileRepository::loadComponent() {
if (eventRead->getContentFullPath().length() > 0) {
std::remove(eventRead->getContentFullPath().c_str());
}
- purgeList.push_back(key);
+ purgeList.push_back(std::make_pair(key, it->value().size()));
}
} else {
- purgeList.push_back(key);
+ purgeList.push_back(std::make_pair(key, it->value().size()));
}
}
delete it;
- std::vector<std::string>::iterator itPurge;
- for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
- std::string eventId = *itPurge;
- logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
- Delete(eventId);
+ for (auto eventId : purgeList) {
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+ if (Delete(eventId.first)) {
+ repo_size_ -= eventId.second;
+ }
}
return;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
new file mode 100644
index 0000000..ac575c5
--- /dev/null
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/repository/VolatileContentRepository.h"
+#include <cstdio>
+#include <string>
+#include <memory>
+#include <thread>
+#include "utils/StringUtils.h"
+#include "io/FileStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+const char *VolatileContentRepository::minimal_locking = "minimal.locking";
+
+bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) {
+ VolatileRepository::initialize(configure);
+ resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) {
+ if (lhsPtr == nullptr || rhsPtr == nullptr) {
+ return false;
+ }
+ return lhsPtr->getContentFullPath() == rhsPtr->getContentFullPath();};
+ resource_claim_check_ = [](std::shared_ptr<minifi::ResourceClaim> claim) {
+ return claim->getFlowFileRecordOwnedCount() <= 0;};
+ claim_reclaimer_ = [&](std::shared_ptr<minifi::ResourceClaim> claim) {if (claim->getFlowFileRecordOwnedCount() <= 0) {
+ remove(claim);
+ }
+ };
+
+ if (configure != nullptr) {
+ bool minimize_locking = false;
+ std::string value;
+ std::stringstream strstream;
+ strstream << Configure::nifi_volatile_repository_options << getName() << "." << minimal_locking;
+ if (configure->get(strstream.str(), value)) {
+ utils::StringUtils::StringToBool(value, minimize_locking);
+ minimize_locking_ = minimize_locking;
+ }
+ }
+ if (!minimize_locking_) {
+ for (auto ent : value_vector_) {
+ delete ent;
+ }
+ value_vector_.clear();
+ }
+ start();
+
+ return true;
+}
+
+void VolatileContentRepository::stop() {
+ running_ = false;
+}
+
+void VolatileContentRepository::run() {
+}
+
+void VolatileContentRepository::start() {
+ if (this->purge_period_ <= 0)
+ return;
+ if (running_)
+ return;
+ thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
+ thread_.detach();
+ running_ = true;
+ logger_->log_info("%s Repository Monitor Thread Start", name_);
+}
+
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ logger_->log_debug("enter write");
+ {
+ std::lock_guard<std::mutex> lock(map_mutex_);
+ auto claim_check = master_list_.find(claim->getContentFullPath());
+ if (claim_check != master_list_.end()) {
+ logger_->log_debug("Creating copy of atomic entry");
+ auto ent = claim_check->second->takeOwnership();
+ if (ent == nullptr) {
+ logger_->log_debug("write returns nullptr");
+ return nullptr;
+ }
+ return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ }
+ }
+
+ int size = 0;
+ if (__builtin_expect(minimize_locking_ == true, 1)) {
+ logger_->log_debug("Minimize locking");
+ for (auto ent : value_vector_) {
+ if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+ std::lock_guard<std::mutex> lock(map_mutex_);
+ master_list_[claim->getContentFullPath()] = ent;
+ return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ }
+ size++;
+ }
+ } else {
+ std::lock_guard < std::mutex > lock(map_mutex_);
+ auto claim_check = master_list_.find(claim->getContentFullPath());
+ if (claim_check != master_list_.end()) {
+ logger_->log_debug("Creating copy of atomic entry");
+ return std::make_shared < io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second);
+ } else {
+ logger_->log_debug("Creating new atomic entry");
+ AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(¤t_size_, &max_size_);
+ if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+ master_list_[claim->getContentFullPath()] = ent;
+ return std::make_shared< io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ }
+ }
+ }
+ logger_->log_debug("write returns nullptr %d", size);
+ return nullptr;
+}
+
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ logger_->log_debug("enter read");
+ int size = 0;
+ {
+ std::lock_guard<std::mutex> lock(map_mutex_);
+ auto claim_check = master_list_.find(claim->getContentFullPath());
+ if (claim_check != master_list_.end()) {
+ auto ent = claim_check->second->takeOwnership();
+ if (ent == nullptr) {
+ return nullptr;
+ }
+ return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ }
+ }
+ logger_->log_debug("enter read for %s after %d", claim->getContentFullPath(), size);
+ return nullptr;
+}
+
+bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ logger_->log_debug("enter remove for %s, reducing %d", claim->getContentFullPath(), current_size_.load());
+ if (__builtin_expect(minimize_locking_ == true, 1)) {
+ std::lock_guard<std::mutex> lock(map_mutex_);
+ auto ent = master_list_.find(claim->getContentFullPath());
+ if (ent != master_list_.end()) {
+ // if we cannot remove the entry we will let the owner's destructor
+ // decrement the reference count and free it
+ if (ent->second->freeValue(claim)) {
+ logger_->log_debug("removed %s", claim->getContentFullPath());
+ return true;
+ }
+ master_list_.erase(claim->getContentFullPath());
+ }
+ } else {
+ std::lock_guard<std::mutex> lock(map_mutex_);
+ delete master_list_[claim->getContentFullPath()];
+ master_list_.erase(claim->getContentFullPath());
+ return true;
+ }
+
+ logger_->log_debug("could not remove %s", claim->getContentFullPath());
+ return false;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/VolatileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index a7e3a51..6e3e1c6 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -16,7 +16,9 @@
* limitations under the License.
*/
#include "core/repository/VolatileRepository.h"
+#include <map>
#include <memory>
+#include <limits>
#include <string>
#include <vector>
#include "FlowFileRecord.h"
@@ -28,33 +30,6 @@ namespace minifi {
namespace core {
namespace repository {
-const char *VolatileRepository::volatile_repo_max_count = "max.count";
-
-void VolatileRepository::run() {
- repo_full_ = false;
-}
-
-/**
- * Purge
- */
-void VolatileRepository::purge() {
- while (current_size_ > max_size_) {
- for (auto ent : value_vector_) {
- // let the destructor do the cleanup
- RepoValue value;
- if (ent->getValue(value)) {
- current_size_ -= value.size();
- logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load());
- }
- if (current_size_ < max_size_)
- break;
- }
- }
-}
-
-void VolatileRepository::loadComponent() {
-}
-
} /* namespace repository */
} /* namespace core */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 4ce944e..b5d9a8f 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -35,7 +35,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
uuid_t uuid;
int64_t version = 0;
- checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ checkRequiredField(&rootFlowNode, "name",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string flowName = rootFlowNode["name"].as<std::string>();
std::string id = getOrGenerateId(&rootFlowNode);
uuid_parse(id.c_str(), uuid);
@@ -47,10 +48,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
}
}
- logger_->log_debug(
- "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
- std::unique_ptr<core::ProcessGroup> group =
- FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+ logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+ std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
this->name_ = flowName;
@@ -77,7 +76,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
core::ProcessorConfig procCfg;
YAML::Node procNode = iter->as<YAML::Node>();
- checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
+ checkRequiredField(&procNode, "name",
+ CONFIG_YAML_PROCESSORS_KEY);
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
uuid_parse(procCfg.id.c_str(), uuid);
@@ -101,11 +101,13 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
}
processor->setName(procCfg.name);
- checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY);
+ checkRequiredField(&procNode, "scheduling strategy",
+ CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
- checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY);
+ checkRequiredField(&procNode, "scheduling period",
+ CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
@@ -224,13 +226,15 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
YAML::Node currRpgNode = iter->as<YAML::Node>();
- checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ checkRequiredField(&currRpgNode, "name",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto name = currRpgNode["name"].as<std::string>();
id = getOrGenerateId(&currRpgNode);
logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
- checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ checkRequiredField(&currRpgNode, "url",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string url = currRpgNode["url"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
@@ -266,7 +270,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
group->setTransmitting(true);
group->setURL(url);
- checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ checkRequiredField(&currRpgNode, "Input Ports",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
@@ -312,9 +317,11 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
YAML::Node node = reportNode->as<YAML::Node>();
- checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ checkRequiredField(&node, "scheduling strategy",
+ CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
- checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ checkRequiredField(&node, "scheduling period",
+ CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
core::TimeUnit unit;
@@ -423,7 +430,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
// Configure basic connection
uuid_t uuid;
- checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY);
+ checkRequiredField(&connectionNode, "name",
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string name = connectionNode["name"].as<std::string>();
std::string id = getOrGenerateId(&connectionNode);
uuid_parse(id.c_str(), uuid);
@@ -431,7 +439,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
logger_->log_debug("Created connection with UUID %s and name %s", id, name);
// Configure connection source
- checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
+ checkRequiredField(&connectionNode, "source relationship name",
+ CONFIG_YAML_CONNECTIONS_KEY);
auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
core::Relationship relationship(rawRelationship, "");
logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
@@ -441,6 +450,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
uuid_t srcUUID;
+ if (connectionNode["max work queue size"]) {
+ auto max_work_queue_str = connectionNode["max work queue size"].as<std::string>();
+ int64_t max_work_queue_size = 0;
+ if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+ connection->setMaxQueueSize(max_work_queue_size);
+ }
+ logger_->log_debug("Setting %d as the max queue size for %s", max_work_queue_size, name);
+ }
+
+ if (connectionNode["max work queue data size"]) {
+ auto max_work_queue_str = connectionNode["max work queue data size"].as<std::string>();
+ int64_t max_work_queue_data_size = 0;
+ if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+ connection->setMaxQueueDataSize(max_work_queue_data_size);
+ }
+ logger_->log_debug("Setting %d as the max queue data size for %s", max_work_queue_data_size, name);
+ }
+
if (connectionNode["source id"]) {
std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
uuid_parse(connectionSrcProcId.c_str(), srcUUID);
@@ -449,7 +476,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
name, connectionSrcProcId);
} else {
// if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
- checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
+ checkRequiredField(&connectionNode, "source name",
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) {
@@ -486,7 +514,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
} else {
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
- checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
+ checkRequiredField(&connectionNode, "destination name",
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
@@ -534,7 +563,8 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
YAML::Node inputPortsObj = portNode->as<YAML::Node>();
// Check for required fields
- checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ checkRequiredField(&inputPortsObj, "name",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto nameStr = inputPortsObj["name"].as<std::string>();
checkRequiredField(&inputPortsObj, "id",
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/AtomicEntryStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/AtomicEntryStream.cpp b/libminifi/src/io/AtomicEntryStream.cpp
new file mode 100644
index 0000000..aac9723
--- /dev/null
+++ b/libminifi/src/io/AtomicEntryStream.cpp
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/AtomicEntryStream.h"
+#include <vector>
+#include <mutex>
+#include <string>
+#include "io/Serializable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 57d6f03..5f7f5f4 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -170,6 +170,9 @@ int16_t Socket::initialize() {
int hh_errno;
gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
#endif
+ if (h == nullptr) {
+ return -1;
+ }
memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
auto p = addr_info_;
@@ -197,7 +200,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
tv.tv_sec = msec / 1000;
tv.tv_usec = (msec % 1000) * 1000;
- std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+ std::lock_guard < std::recursive_mutex > guard(selection_mutex_);
if (msec > 0)
retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
@@ -241,14 +244,12 @@ int16_t Socket::setSocketOptions(const int sock) {
bool nagle_off = true;
#ifndef __MACH__
if (nagle_off) {
- if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt))
- < 0) {
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) {
logger_->log_error("setsockopt() TCP_NODELAY failed");
close(sock);
return -1;
}
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt),
- sizeof(opt)) < 0) {
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
logger_->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
return -1;
@@ -256,8 +257,7 @@ int16_t Socket::setSocketOptions(const int sock) {
}
int sndsize = 256 * 1024;
- if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize),
- sizeof(sndsize)) < 0) {
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) {
logger_->log_error("setsockopt() SO_SNDBUF failed");
close(sock);
return -1;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
new file mode 100644
index 0000000..3b2bfe1
--- /dev/null
+++ b/libminifi/src/io/FileStream.cpp
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/FileStream.h"
+#include <fstream>
+#include <vector>
+#include <memory>
+#include <string>
+#include "io/validation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+FileStream::FileStream(const std::string &path)
+ : logger_(logging::LoggerFactory<FileStream>::getLogger()),
+ path_(path),
+ offset_(0) {
+ file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
+ file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
+ file_stream_->seekg(0, file_stream_->end);
+ file_stream_->seekp(0, file_stream_->end);
+ int len = file_stream_->tellg();
+ if (len > 0) {
+ length_ = len;
+ } else {
+ length_ = 0;
+ }
+ seek(offset_);
+}
+
+FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable)
+ : logger_(logging::LoggerFactory<FileStream>::getLogger()),
+ path_(path) {
+ file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
+ if (write_enable) {
+ file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
+ } else {
+ file_stream_->open(path.c_str(), std::fstream::in | std::fstream::binary);
+ }
+ file_stream_->seekg(0, file_stream_->end);
+ file_stream_->seekp(0, file_stream_->end);
+ int len = file_stream_->tellg();
+ if (len > 0) {
+ length_ = len;
+ } else {
+ length_ = 0;
+ }
+ seek(offset);
+}
+
+void FileStream::closeStream() {
+ std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ if (file_stream_ != nullptr) {
+ file_stream_->close();
+ file_stream_ = nullptr;
+ }
+}
+
+void FileStream::seek(uint64_t offset) {
+ std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ offset_ = offset;
+ file_stream_->clear();
+ file_stream_->seekg(offset_);
+ file_stream_->seekp(offset_);
+}
+
+int FileStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ return -1;
+ }
+ return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int FileStream::writeData(uint8_t *value, int size) {
+ if (!IsNullOrEmpty(value)) {
+ std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
+ offset_ += size;
+ if (offset_ > length_) {
+ length_ = offset_;
+ }
+ file_stream_->seekg(offset_);
+ file_stream_->flush();
+ return size;
+ } else {
+ return -1;
+ }
+ } else {
+ return -1;
+ }
+}
+
+template<typename T>
+inline std::vector<uint8_t> FileStream::readBuffer(const T& t) {
+ std::vector<uint8_t> buf;
+ buf.resize(sizeof t);
+ readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ return buf;
+}
+
+int FileStream::readData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ buf.resize(buflen);
+ }
+ int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+ if (ret < buflen) {
+ buf.resize(ret);
+ }
+ return ret;
+}
+
+int FileStream::readData(uint8_t *buf, int buflen) {
+ if (!IsNullOrEmpty(buf)) {
+ std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+ if ((file_stream_->rdstate() & (file_stream_->eofbit | file_stream_->failbit)) != 0) {
+ file_stream_->clear();
+ file_stream_->seekg(0, file_stream_->end);
+ file_stream_->seekp(0, file_stream_->end);
+ int len = file_stream_->tellg();
+ offset_ = len;
+ length_ = len;
+ return offset_;
+ } else {
+ offset_ += buflen;
+ file_stream_->seekp(offset_);
+ return buflen;
+ }
+
+ } else {
+ return -1;
+ }
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
index 7990edd..288f4d1 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -47,11 +47,11 @@ class SocketCreator : public AbstractStreamFactory {
public:
template<typename Q = V>
ContextTypeCheck<true, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
- return std::make_shared<V>(configure);
+ return std::make_shared < V > (configure);
}
template<typename Q = V>
ContextTypeCheck<false, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) {
- return std::make_shared<SocketContext>(configure);
+ return std::make_shared < SocketContext > (configure);
}
SocketCreator<T, V>(std::shared_ptr<Configure> configure) {
@@ -69,7 +69,7 @@ class SocketCreator : public AbstractStreamFactory {
std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) {
T *socket = create(host, port);
- return std::unique_ptr<Socket>(socket);
+ return std::unique_ptr < Socket > (socket);
}
private:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 323d69a..9c6f732 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -145,7 +145,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
break;
logger_->log_info("Execute Command Respond %d", numRead);
ExecuteProcess::WriteCallback callback(buffer, numRead);
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
continue;
flowFile->addAttribute("command", _command.c_str());
@@ -167,7 +167,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
// child exits and close the pipe
ExecuteProcess::WriteCallback callback(buffer, totalRead);
if (!flowFile) {
- flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
break;
flowFile->addAttribute("command", _command.c_str());
@@ -185,7 +185,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
if (!flowFile) {
- flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
continue;
flowFile->addAttribute("command", _command.c_str());
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index 3741a8f..2fee3f2 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -91,7 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
}
for (int i = 0; i < batchSize; i++) {
// For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
return;
if (fileSize > 0)
@@ -114,7 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes
GenerateFlowFile::WriteCallback callback(_data, _dataSize);
for (int i = 0; i < batchSize; i++) {
// For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
return;
if (fileSize > 0)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index f1dbb21..723d461 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -150,7 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
std::string fileName = list.front();
list.pop();
logger_->log_info("GetFile process %s", fileName.c_str());
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (flowFile == nullptr)
return;
std::size_t found = fileName.find_last_of("/\\");
@@ -172,19 +172,19 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
}
bool GetFile::isListingEmpty() {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
return _dirList.empty();
}
void GetFile::putListing(std::string fileName) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
_dirList.push(fileName);
}
void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) {
std::string fileName = _dirList.front();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index d5097bb..7dc75d2 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -42,6 +42,8 @@
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
+#include "utils/ByteInputCallBack.h"
+#include "utils/HTTPUtils.h"
namespace org {
namespace apache {
@@ -113,7 +115,7 @@ void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
if (my_method == "POST") {
curl_easy_setopt(curl, CURLOPT_POST, 1);
} else if (my_method == "PUT") {
- curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
+ curl_easy_setopt(curl, CURLOPT_PUT, 1);
} else if (my_method == "GET") {
} else {
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, my_method.c_str());
@@ -210,7 +212,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) {
std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
if (nullptr != service) {
- ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+ ssl_context_service_ = std::static_pointer_cast < minifi::controllers::SSLContextService > (service);
}
}
}
@@ -283,14 +285,14 @@ void InvokeHTTP::configure_secure_connection(CURL *http_session) {
}
void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
logger_->log_info("onTrigger InvokeHTTP with %s", method_.c_str());
if (flowFile == nullptr) {
if (!emitFlowFile(method_)) {
logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str());
- flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
} else {
logger_->log_info("exiting because method is %s", method_.c_str());
return;
@@ -317,9 +319,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
if (read_timeout_ > 0) {
curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
}
+
utils::HTTPRequestResponse content;
- curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
- &utils::HTTPRequestResponse::recieve_write);
+ curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
@@ -333,13 +335,10 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
callbackObj->ptr = callback;
callbackObj->pos = 0;
logger_->log_info("InvokeHTTP -- Setting callback");
- curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
- (curl_off_t)callback->getBufferSize());
- curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
- &utils::HTTPRequestResponse::send_write);
- curl_easy_setopt(http_session, CURLOPT_READDATA,
- static_cast<void*>(callbackObj));
+ curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
+ curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
+ curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
@@ -377,14 +376,14 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
bool output_body_to_content = isSuccess && !putToAttribute;
bool body_empty = IsNullOrEmpty(content.data);
- logger_->log_info("isSuccess: %d", isSuccess);
+ logger_->log_info("isSuccess: %d, response code %d ", isSuccess, http_code);
std::shared_ptr<FlowFileRecord> response_flow = nullptr;
if (output_body_to_content) {
if (flowFile != nullptr) {
- response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile));
+ response_flow = std::static_pointer_cast < FlowFileRecord > (session->create(flowFile));
} else {
- response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
+ response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
}
std::string ct = content_type;
@@ -398,7 +397,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
session->importFrom(stream, response_flow);
} else {
logger_->log_info("Cannot output body to content");
- response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
+ response_flow = std::static_pointer_cast < FlowFileRecord > (session->create());
}
route(flowFile, response_flow, session, context, isSuccess, http_code);
} else {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index c26b41d..d410547 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -201,7 +201,7 @@ ListenHTTP::~ListenHTTP() {
}
void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
@@ -243,7 +243,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *
auto session = _processSessionFactory->createSession();
ListenHTTP::WriteCallback callback(conn, req_info);
- auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ auto flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile) {
sendErrorResponse(conn);
@@ -295,11 +295,11 @@ ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struc
_reqInfo = reqInfo;
}
-void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
int64_t rlen;
int64_t nlen = 0;
int64_t tlen = _reqInfo->content_length;
- char buf[16384];
+ uint8_t buf[16384];
while (nlen < tlen) {
rlen = tlen - nlen;
@@ -320,6 +320,8 @@ void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
nlen += rlen;
}
+
+ return nlen;
}
} /* namespace processors */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index 054d585..a5fdf28 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -279,7 +279,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession
SysLogEvent event = eventQueue.front();
eventQueue.pop();
if (firstEvent) {
- flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
if (!flowFile)
return;
ListenSyslog::WriteCallback callback(event.payload, event.len);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index e308901..ad8e664 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -71,8 +71,9 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession
std::shared_ptr<core::FlowFile> flow = session->get();
- if (!flow)
+ if (!flow) {
return;
+ }
std::string value;
if (context->getProperty(LogLevel.getName(), value)) {
@@ -107,10 +108,10 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession
message << "\n" << "Payload:" << "\n";
ReadCallback callback(flow->getSize());
session->read(flow, &callback);
- for (unsigned int i = 0, j = 0; i < callback._readSize; i++) {
- message << std::hex << callback._buffer[i];
+ for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) {
+ message << std::hex << callback.buffer_[i];
j++;
- if (j == 16) {
+ if (j == 80) {
message << '\n';
j = 0;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index d72c56a..b425ae9 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -17,20 +17,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "processors/PutFile.h"
-#include <stdio.h>
+
+#include "../../include/processors/PutFile.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
#include <uuid/uuid.h>
-#include <sstream>
-#include <string>
+#include <cstdint>
+#include <cstdio>
#include <iostream>
#include <memory>
#include <set>
-#include <fstream>
-#include "io/validation.h"
-#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
+#include <string>
+
+#include "../../include/core/logging/Logger.h"
+#include "../../include/core/ProcessContext.h"
+#include "../../include/core/Property.h"
+#include "../../include/core/Relationship.h"
+#include "../../include/io/BaseStream.h"
+#include "../../include/io/DataStream.h"
+#include "../../include/io/validation.h"
namespace org {
namespace apache {
@@ -76,7 +82,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
return;
}
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
@@ -142,10 +148,23 @@ PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::strin
}
// Copy the entire file contents to the temporary file
-void PutFile::ReadCallback::process(std::ifstream *stream) {
+int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
// Copy file contents into tmp file
_writeSucceeded = false;
- _tmpFileOs << stream->rdbuf();
+ size_t size = 0;
+ uint8_t buffer[1024];
+ do {
+ int read = stream->read(buffer, 1024);
+ if (read < 0) {
+ return -1;
+ }
+ if (read == 0) {
+ break;
+ }
+ _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
+ size += read;
+ } while (size < stream->getSize());
+ return size;
_writeSucceeded = true;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index 46ed1fb..f87f4ec 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -51,7 +51,8 @@ core::Property TailFile::StateFile("State File", "Specifies the file that should
" what data has been ingested so that upon restart NiFi can resume from where it left off",
"TailFileState");
core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
- "from the incoming file.", "");
+ "from the incoming file.",
+ "");
core::Relationship TailFile::Success("success", "All files are routed to success");
void TailFile::initialize() {
@@ -240,10 +241,9 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
context->yield();
return;
}
-
- std::size_t found = _currentTailFileName.find_last_of(".");
- std::string baseName = _currentTailFileName.substr(0, found);
- std::string extension = _currentTailFileName.substr(found + 1);
+ std::size_t found = _currentTailFileName.find_last_of(".");
+ std::string baseName = _currentTailFileName.substr(0, found);
+ std::string extension = _currentTailFileName.substr(found + 1);
if (!this->_delimiter.empty()) {
char delim = this->_delimiter.c_str()[0];
@@ -254,20 +254,20 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) {
logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize());
std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
- ffr->updateKeyedAttribute(PATH, fileLocation);
- ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ ffr->updateKeyedAttribute(PATH, fileLocation);
+ ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
ffr->updateKeyedAttribute(FILENAME, logName);
- session->transfer(ffr, Success);
+ session->transfer(ffr, Success);
this->_currentTailFilePosition += ffr->getSize() + 1;
storeState();
}
} else {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- flowFile->updateKeyedAttribute(PATH, fileLocation);
- flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (!flowFile)
+ return;
+ flowFile->updateKeyedAttribute(PATH, fileLocation);
+ flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
session->transfer(flowFile, Success);
logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize());
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 8686a58..3e42a5a 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -17,13 +17,12 @@
*/
#include "provenance/Provenance.h"
-
#include <arpa/inet.h>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
-
+#include "core/Repository.h"
#include "io/DataStream.h"
#include "io/Serializable.h"
#include "core/logging/Logger.h"
@@ -42,30 +41,35 @@ std::shared_ptr<logging::Logger> ProvenanceEventRecord::logger_ = logging::Logge
const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED",
"ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
-ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) {
+ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType)
+ : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()),
+ _eventDuration(0),
+ _entryDate(0),
+ _lineageStartDate(0) {
_eventType = event;
_componentId = componentId;
_componentType = componentType;
_eventTime = getTimeMillis();
- char eventIdStr[37];
- // Generate the global UUID for th event
- id_generator_->generate(_eventId);
- uuid_unparse_lower(_eventId, eventIdStr);
- _eventIdStr = eventIdStr;
}
// DeSerialize
-bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) {
+bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
std::string value;
bool ret;
- ret = repo->Get(key, value);
+ const std::shared_ptr<core::Repository> repo = std::dynamic_pointer_cast<core::Repository>(store);
+
+ if (nullptr == repo || IsNullOrEmpty(uuidStr_)) {
+ logger_->log_error("Repo could not be assigned");
+ return false;
+ }
+ ret = repo->Get(uuidStr_, value);
if (!ret) {
- logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str());
+ logger_->log_error("NiFi Provenance Store event %s can not be found", uuidStr_);
return false;
} else {
- logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length());
+ logger_->log_debug("NiFi Provenance Read event %s length %d", uuidStr_, value.length());
}
org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length());
@@ -73,20 +77,20 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository>
ret = DeSerialize(stream);
if (ret) {
- logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType);
+ logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", uuidStr_, stream.getSize(), _eventType);
} else {
- logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType);
+ logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", uuidStr_, stream.getSize(), _eventType);
}
return ret;
}
-bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) {
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
org::apache::nifi::minifi::io::DataStream outStream;
int ret;
- ret = writeUTF(this->_eventIdStr, &outStream);
+ ret = writeUTF(this->uuidStr_, &outStream);
if (ret <= 0) {
return false;
}
@@ -127,7 +131,7 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r
return false;
}
- ret = writeUTF(this->uuid_, &outStream);
+ ret = writeUTF(this->flow_uuid_, &outStream);
if (ret <= 0) {
return false;
}
@@ -215,20 +219,20 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r
}
}
// Persistent to the DB
- if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
- logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize());
+ if (repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+ logger_->log_debug("NiFi Provenance Store event %s size %d success", uuidStr_, outStream.getSize());
} else {
- logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize());
+ logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize());
}
return true;
}
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
int ret;
org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize);
- ret = readUTF(this->_eventIdStr, &outStream);
+ ret = readUTF(this->uuidStr_, &outStream);
if (ret <= 0) {
return false;
@@ -271,7 +275,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferS
return false;
}
- ret = readUTF(this->uuid_, &outStream);
+ ret = readUTF(this->flow_uuid_, &outStream);
if (ret <= 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index e4a8ffa..ce19fe4 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -31,31 +31,23 @@ void ProvenanceRepository::run() {
uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
while (running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
- std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
uint64_t curTime = getTimeMillis();
uint64_t size = repoSize();
if (size >= purgeThreshold) {
- std::vector<std::string> purgeList;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ProvenanceEventRecord eventRead;
std::string key = it->key().ToString();
- if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) {
- if ((curTime - eventRead.getEventTime()) > max_partition_millis_)
- purgeList.push_back(key);
+ uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
+ if (eventTime > 0) {
+ if ((curTime - eventTime) > max_partition_millis_)
+ Delete(key);
} else {
logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
- purgeList.push_back(key);
+ Delete(key);
}
}
delete it;
- std::vector<std::string>::iterator itPurge;
-
- for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
- std::string eventId = *itPurge;
- logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str());
- Delete(eventId);
- }
}
if (size > max_partition_bytes_)
repo_full_ = true;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index beed48a..a1e22ad 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -1,3 +1,4 @@
set noparent
filter=-build/include_order,-build/include_alpha
exclude_files=Server.cpp
+exclude_files=TestBase.cpp
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
new file mode 100644
index 0000000..4c0814d
--- /dev/null
+++ b/libminifi/test/TestBase.cpp
@@ -0,0 +1,211 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "./TestBase.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
+ :
+ content_repo_(content_repo),
+ flow_repo_(flow_repo),
+ prov_repo_(prov_repo),
+ location(-1),
+ finalized(false),
+ current_flowfile_(nullptr) {
+}
+
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+
+ uuid_t uuid;
+ uuid_generate(uuid);
+
+ // initialize the processor
+ processor->initialize();
+
+ processor_mapping_[processor->getUUIDStr()] = processor;
+
+ if (!linkToPrevious) {
+ termination_ = relationship;
+ } else {
+ std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+ if (last == nullptr) {
+ last = processor;
+ termination_ = relationship;
+ }
+
+ std::stringstream connection_name;
+ connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(relationship);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(last);
+ connection->setDestination(processor);
+
+ uuid_t uuid_copy, uuid_copy_next;
+ last->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ processor->getUUID(uuid_copy_next);
+ connection->setDestinationUUID(uuid_copy_next);
+ last->addConnection(connection);
+ if (last != processor) {
+ processor->addConnection(connection);
+ }
+ relationships_.push_back(connection);
+ }
+
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+
+ processor_nodes_.push_back(node);
+
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(*(node.get()), controller_services_provider_, flow_repo_, prov_repo_, content_repo_);
+ processor_contexts_.push_back(context);
+
+ processor_queue_.push_back(processor);
+
+ return processor;
+}
+
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship,
+bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+
+ uuid_t uuid;
+ uuid_generate(uuid);
+
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+ if (nullptr == ptr) {
+ throw std::exception();
+ }
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+ processor->setName(name);
+
+ return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ int i = 0;
+ for (i = 0; i < processor_queue_.size(); i++) {
+ if (processor_queue_.at(i) == proc) {
+ break;
+ }
+ }
+ if (i >= processor_queue_.size() || i < 0 || i >= processor_contexts_.size()) {
+ return false;
+ }
+
+ return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+void TestPlan::reset() {
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ process_sessions_.clear();
+ factories_.clear();
+ location = -1;
+ for (auto proc : processor_queue_) {
+ while (proc->getActiveTasks() > 0) {
+ proc->decrementActiveTask();
+ }
+ }
+}
+
+bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify) {
+ if (!finalized) {
+ finalize();
+ }
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ location++;
+ std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+ std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+ std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context.get());
+ factories_.push_back(factory);
+ if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
+ processor->onSchedule(context.get(), factory.get());
+ configured_processors_.push_back(processor);
+ }
+ std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context.get());
+ process_sessions_.push_back(current_session);
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ if (verify != nullptr) {
+ verify(context.get(), current_session.get());
+ } else {
+ processor->onTrigger(context.get(), current_session.get());
+ }
+ current_session->commit();
+ current_flowfile_ = current_session->get();
+ return location + 1 < processor_queue_.size();
+}
+
+std::set<provenance::ProvenanceEventRecord*> TestPlan::getProvenanceRecords() {
+ return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
+ return current_flowfile_;
+}
+
+std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
+ std::stringstream connection_name;
+ std::shared_ptr<core::Processor> last = processor;
+ connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
+ connection->setRelationship(termination_);
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(last);
+ if (setDest)
+ connection->setDestination(processor);
+
+ uuid_t uuid_copy;
+ last->getUUID(uuid_copy);
+ connection->setSourceUUID(uuid_copy);
+ if (setDest)
+ connection->setDestinationUUID(uuid_copy);
+
+ processor->addConnection(connection);
+ return connection;
+}
+
+void TestPlan::finalize() {
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ if (relationships_.size() > 0) {
+ relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+ } else {
+ for (auto processor : processor_queue_) {
+ relationships_.push_back(buildFinalConnection(processor, true));
+ }
+ }
+
+ finalized = true;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 331df08..47db4c3 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -25,6 +25,8 @@
#include "ResourceClaim.h"
#include "catch.hpp"
#include <vector>
+#include <set>
+#include <map>
#include "core/logging/Logger.h"
#include "core/Core.h"
#include "properties/Configure.h"
@@ -33,6 +35,14 @@
#include "utils/Id.h"
#include "spdlog/sinks/ostream_sink.h"
#include "spdlog/sinks/dist_sink.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
class LogTestController {
public:
@@ -105,7 +115,7 @@ class LogTestController {
std::ostringstream log_output;
std::shared_ptr<logging::Logger> logger_;
- private:
+ private:
class TestBootstrapLogger : public logging::Logger {
public:
TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
@@ -138,6 +148,73 @@ class LogTestController {
std::vector<std::string> modified_loggers;
};
+class TestPlan {
+ public:
+
+ explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+ core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false);
+
+ bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+
+ void reset();
+
+ bool runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr);
+
+ std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
+
+ std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+
+ std::shared_ptr<core::Repository> getFlowRepo() {
+ return flow_repo_;
+ }
+
+ std::shared_ptr<core::Repository> getProvenanceRepo() {
+ return prov_repo_;
+ }
+
+ std::shared_ptr<core::ContentRepository> getContentRepo() {
+ return content_repo_;
+ }
+
+ protected:
+
+ void finalize();
+
+ std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
+
+ std::atomic<bool> finalized;
+
+ std::shared_ptr<core::ContentRepository> content_repo_;
+
+ std::shared_ptr<core::Repository> flow_repo_;
+ std::shared_ptr<core::Repository> prov_repo_;
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+
+ std::recursive_mutex mutex;
+
+ int location;
+
+ std::shared_ptr<core::ProcessSession> current_session_;
+ std::shared_ptr<core::FlowFile> current_flowfile_;
+
+ std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::vector<std::shared_ptr<core::Processor>> processor_queue_;
+ std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+ std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
+ std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
+ std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
+ std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+ std::vector<std::shared_ptr<minifi::Connection>> relationships_;
+ core::Relationship termination_;
+};
+
class TestController {
public:
@@ -148,6 +225,25 @@ class TestController {
utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
}
+ std::shared_ptr<TestPlan> createPlan()
+ {
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+ content_repo->initialize(configuration);
+
+ std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
+ return std::make_shared<TestPlan>(content_repo, repo, repo);
+ }
+
+ void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) {
+
+ while (plan->runNextProcessor(verify) && runToCompletion)
+ {
+
+ }
+ }
+
~TestController() {
for (auto dir : directories) {
DIR *created_dir;
@@ -176,6 +272,10 @@ class TestController {
}
protected:
+
+ std::mutex test_mutex;
+ //std::map<std::string,>
+
LogTestController &log;
std::vector<char*> directories;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/TestServer.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h
new file mode 100644
index 0000000..263a6b3
--- /dev/null
+++ b/libminifi/test/TestServer.h
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_TESTSERVER_H_
+#define LIBMINIFI_TEST_TESTSERVER_H_
+#include <regex.h>
+#include <string>
+#include <iostream>
+#include "civetweb.h"
+
+/* Server context handle */
+static struct mg_context *ctx;
+static std::string resp_str;
+
+static int responder(struct mg_connection *conn, void *response) {
+ const char *msg = resp_str.c_str();
+
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+ "Content-Length: %lu\r\n"
+ "Content-Type: text/plain\r\n"
+ "Connection: close\r\n\r\n",
+ resp_str.size());
+
+ mg_write(conn, msg, resp_str.size());
+
+ return 200;
+}
+
+void init_webserver() {
+ mg_init_library(0);
+}
+
+void start_webserver(std::string &port, std::string &rooturi, const std::string &response, struct mg_callbacks *callbacks, std::string &cert) {
+
+ std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
+ resp_str = response;
+ const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", cert.c_str(), "ssl_protocol_version", "3", "ssl_cipher_list",
+ "ECDHE-RSA-AES256-GCM-SHA384:DES-CBC3-SHA:AES128-SHA:AES128-GCM-SHA256", 0 };
+
+ if (!mg_check_feature(2)) {
+ std::cerr << "Error: Embedded example built with SSL support, " << "but civetweb library build without" << std::endl;
+ exit(1);
+ }
+
+ ctx = mg_start(callbacks, 0, options);
+ if (ctx == nullptr) {
+ std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
+ exit(1);
+ }
+
+ mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+
+}
+
+void start_webserver(std::string &port, std::string &rooturi, const std::string &response) {
+
+ std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl;
+ resp_str = response;
+
+ const char *options[] = { "listening_ports", port.c_str(), 0 };
+ ctx = mg_start(nullptr, 0, options);
+
+ if (ctx == nullptr) {
+ std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl;
+ exit(1);
+ }
+
+ mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str);
+
+}
+
+bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
+ regex_t regex;
+
+ const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+
+ int ret = regcomp(®ex, regexstr, REG_EXTENDED);
+ if (ret) {
+ return false;
+ }
+
+ size_t potentialGroups = regex.re_nsub + 1;
+ regmatch_t groups[potentialGroups];
+ if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) {
+ for (int i = 0; i < potentialGroups; i++) {
+ if (groups[i].rm_so == -1)
+ break;
+
+ std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
+ switch (i) {
+ case 1:
+ scheme = str;
+ break;
+ case 3:
+ port = str;
+ break;
+ case 4:
+ path = str;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ if (path.empty() || scheme.empty() || port.empty())
+ return false;
+
+ regfree(®ex);
+
+ return true;
+
+}
+
+static void stop_webserver() {
+ /* Stop the server */
+ mg_stop(ctx);
+
+ /* Un-initialize the library */
+ mg_exit_library();
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
index 3f27b66..15720eb 100644
--- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
@@ -80,18 +80,21 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_default_directory, key_dir);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(configuration);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
- DEFAULT_ROOT_GROUP_NAME,
+ content_repo,
+ DEFAULT_ROOT_GROUP_NAME,
true);
disabled = false;
std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpConfigurationListenerTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
index a86b884..b559f41 100644
--- a/libminifi/test/integration/HttpConfigurationListenerTest.cpp
+++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
@@ -46,7 +46,7 @@ void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
-class ConfigHandler: public CivetHandler {
+class ConfigHandler : public CivetHandler {
public:
bool handleGet(CivetServer *server, struct mg_connection *conn) {
std::ifstream myfile(test_file_location_.c_str());
@@ -57,8 +57,8 @@ class ConfigHandler: public CivetHandler {
std::string str = buffer.str();
myfile.close();
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
mg_printf(conn, "%s", str.c_str());
} else {
mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
@@ -75,7 +75,7 @@ int main(int argc, char **argv) {
LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>();
const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
- std::vector < std::string > cpp_options;
+ std::vector<std::string> cpp_options;
for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
cpp_options.push_back(options[i]);
}
@@ -89,45 +89,32 @@ int main(int argc, char **argv) {
h_ex.test_file_location_ = test_file_location = argv[1];
key_dir = argv[2];
}
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<
- minifi::Configure>();
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
configuration->set(minifi::Configure::nifi_default_directory, key_dir);
- configuration->set(minifi::Configure::nifi_configuration_listener_type,
- "http");
- configuration->set(
- minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
- configuration->set(minifi::Configure::nifi_configuration_listener_http_url,
- "http://localhost:9090/config");
+ configuration->set(minifi::Configure::nifi_configuration_listener_type, "http");
+ configuration->set(minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
+ configuration->set(minifi::Configure::nifi_configuration_listener_http_url, "http://localhost:9090/config");
mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
- std::shared_ptr<core::Repository> test_repo =
- std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
- TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file,
- test_file_location);
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
- < minifi::io::StreamFactory > (configuration);
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
- < core::YamlConfiguration
- > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
- configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast
- < TestRepository > (test_repo);
-
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared < minifi::FlowController
- > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
- configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
- test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
- > (ptr.get());
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared < minifi::io::StreamFactory > (configuration);
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr < core::YamlConfiguration
+ > (new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast < TestRepository > (test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared < minifi::FlowController
+ > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup > (ptr.get());
ptr.release();
controller->load();