You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:43 UTC
[4/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to
better abstract deser.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 24ba146..1060830 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -42,7 +42,7 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
// reference the enable function from serviceNode
- std::function<bool()> f_ex = [serviceNode] {
+ std::function < bool() > f_ex = [serviceNode] {
return serviceNode->enable();
};
// create a functor that will be submitted to the thread pool.
@@ -55,7 +55,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
// reference the disable function from serviceNode
- std::function<bool()> f_ex = [serviceNode] {
+ std::function < bool() > f_ex = [serviceNode] {
return serviceNode->disable();
};
// create a functor that will be submitted to the thread pool.
@@ -77,13 +77,15 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core
// No need to yield, reset yield expiration to 0
processor->clearYield();
- if (!hasWorkToDo(processor))
+ if (!hasWorkToDo(processor)) {
// No work to do, yield
return true;
-
- if (hasTooMuchOutGoing(processor))
+ }
+ if (hasTooMuchOutGoing(processor)) {
+ logger_->log_debug("backpressure applied because too much outgoing");
// need to apply backpressure
return true;
+ }
processor->incrementActiveTasks();
try {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 7d6e3f3..024bd35 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -726,13 +726,19 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet
if (ret != 8) {
return false;
}
- if (flowFile->getSize()) {
+ if (flowFile->getSize() > 0) {
Site2SiteClientProtocol::ReadCallback callback(packet);
session->read(flowFile, &callback);
if (flowFile->getSize() != packet->_size) {
return false;
}
}
+ if (packet->payload_.length() == 0 && len == 0) {
+ if (flowFile->getResourceClaim() == nullptr)
+ logger_->log_debug("no claim");
+ else
+ logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
+ }
} else if (packet->payload_.length() > 0) {
len = packet->payload_.length();
@@ -1101,8 +1107,9 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
Transaction *transaction = NULL;
- if (!flow)
+ if (!flow) {
return;
+ }
if (_peerState != READY) {
bootstrap();
@@ -1158,11 +1165,15 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
} // while true
if (!confirm(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+ std::stringstream ss;
+ ss << "Confirm Failed for " << transactionID;
+ throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
return;
}
if (!complete(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+ std::stringstream ss;
+ ss << "Complete Failed for " << transactionID;
+ throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
return;
}
logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 46a4710..7b4ce85 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -36,7 +36,7 @@ namespace nifi {
namespace minifi {
void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
admin_yield_duration_ = 0;
std::string yieldValue;
@@ -68,8 +68,8 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
}
core::ProcessorNode processor_node(processor);
- auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_);
- auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext.get());
+ auto processContext = std::make_shared < core::ProcessContext > (processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_);
+ auto sessionFactory = std::make_shared < core::ProcessSessionFactory > (processContext.get());
processor->onSchedule(processContext.get(), sessionFactory.get());
@@ -89,7 +89,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
}
void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
if (processor->getScheduledState() != core::RUNNING) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index a9450f6..73c9e35 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -35,7 +35,7 @@ void SSLContextService::initialize() {
if (initialized_)
return;
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
ControllerService::initialize();
@@ -75,31 +75,31 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
if (retp == 0) {
logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
}
- return std::unique_ptr<SSLContext>(new SSLContext(ctx));
+ return std::unique_ptr < SSLContext > (new SSLContext(ctx));
}
const std::string &SSLContextService::getCertificateFile() {
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
return certificate;
}
const std::string &SSLContextService::getPassphrase() {
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
return passphrase_;
}
const std::string &SSLContextService::getPassphraseFile() {
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
return passphrase_file_;
}
const std::string &SSLContextService::getPrivateKeyFile() {
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
return private_key_;
}
const std::string &SSLContextService::getCACertificate() {
- std::lock_guard<std::mutex> lock(initialization_mutex_);
+ std::lock_guard < std::mutex > lock(initialization_mutex_);
return ca_certificate_;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index 9bead0e..fbd46f6 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -43,7 +43,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
logger_->log_error("Cannot load library: %s", dlerror());
return RESOURCE_FAILURE;
} else {
- std::lock_guard<std::mutex> lock(internal_mutex_);
+ std::lock_guard < std::mutex > lock(internal_mutex_);
dl_handles_.push_back(resource_ptr);
}
@@ -60,9 +60,9 @@ uint16_t ClassLoader::registerResource(const std::string &resource) {
ObjectFactory *factory = create_factory_func();
- std::lock_guard<std::mutex> lock(internal_mutex_);
+ std::lock_guard < std::mutex > lock(internal_mutex_);
- loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory);
+ loaded_factories_[factory->getClassName()] = std::unique_ptr < ObjectFactory > (factory);
return RESOURCE_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index f5247ac..62a08db 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -29,6 +29,7 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace core {
+
ConfigurableComponent::ConfigurableComponent()
: logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) {
}
@@ -42,7 +43,7 @@ ConfigurableComponent::~ConfigurableComponent() {
}
bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) {
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
auto &&it = properties_.find(name);
@@ -61,7 +62,7 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop)
* @return result of getting property.
*/
bool ConfigurableComponent::getProperty(const std::string name, std::string &value) {
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
auto &&it = properties_.find(name);
if (it != properties_.end()) {
@@ -80,7 +81,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val
* @return result of setting property.
*/
bool ConfigurableComponent::setProperty(const std::string name, std::string value) {
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
auto &&it = properties_.find(name);
if (it != properties_.end()) {
@@ -101,7 +102,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu
* @return result of setting property.
*/
bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) {
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
auto &&it = properties_.find(name);
if (it != properties_.end()) {
@@ -122,7 +123,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
* @return whether property was set or not
*/
bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
auto it = properties_.find(prop.getName());
if (it != properties_.end()) {
@@ -150,7 +151,7 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties
return false;
}
- std::lock_guard<std::mutex> lock(configuration_mutex_);
+ std::lock_guard < std::mutex > lock(configuration_mutex_);
properties_.clear();
for (auto item : properties) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index ea2ed5c..0a0e911 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -39,7 +39,8 @@ namespace core {
class YamlConfiguration;
#endif
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
+ std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path,
bool fail_safe) {
std::string class_name_lc = configuration_class_name;
@@ -47,22 +48,23 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr
try {
if (class_name_lc == "flowconfiguration") {
// load the base configuration.
- return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+
+ return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
} else if (class_name_lc == "yamlconfiguration") {
// only load if the class is defined.
- return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, configure, path));
+ return std::unique_ptr < core::FlowConfiguration > (instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path));
} else {
if (fail_safe) {
- return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+ return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
} else {
throw std::runtime_error("Support for the provided configuration class could not be found");
}
}
} catch (const std::runtime_error &r) {
if (fail_safe) {
- return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
+ return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path));
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index cf01f0c..9c3b26a 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -53,7 +53,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
return false;
}
- std::lock_guard<std::mutex> lock(relationship_mutex_);
+ std::lock_guard < std::mutex > lock(relationship_mutex_);
relationships_.clear();
for (auto item : relationships) {
@@ -67,7 +67,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
bool Connectable::isSupportedRelationship(core::Relationship relationship) {
const bool requiresLock = isRunning();
- const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
+ const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
const auto &it = relationships_.find(relationship.getName());
if (it != relationships_.end()) {
@@ -83,7 +83,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
return false;
}
- std::lock_guard<std::mutex> lock(relationship_mutex_);
+ std::lock_guard < std::mutex > lock(relationship_mutex_);
auto_terminated_relationships_.clear();
for (auto item : relationships) {
@@ -97,7 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
bool Connectable::isAutoTerminated(core::Relationship relationship) {
const bool requiresLock = isRunning();
- const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
+ const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_);
const auto &it = auto_terminated_relationships_.find(relationship.getName());
if (it != auto_terminated_relationships_.end()) {
@@ -111,7 +111,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) {
has_work_.store(isWorkAvailable());
if (!has_work_.load()) {
- std::unique_lock<std::mutex> lock(work_available_mutex_);
+ std::unique_lock < std::mutex > lock(work_available_mutex_);
work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();});
}
}
@@ -143,7 +143,7 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std::
}
std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() {
- std::lock_guard<std::mutex> lock(relationship_mutex_);
+ std::lock_guard < std::mutex > lock(relationship_mutex_);
if (_incomingConnections.size() == 0)
return NULL;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index 304d4ce..995c001 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -35,6 +35,11 @@ void CoreComponent::setUUID(uuid_t uuid) {
uuid_unparse_lower(uuid_, uuidStr);
uuidStr_ = uuidStr;
}
+
+void CoreComponent::setUUIDStr(const std::string uuidStr) {
+ uuid_parse(uuidStr.c_str(), uuid_);
+ uuidStr_ = uuidStr;
+}
// Get UUID
bool CoreComponent::getUUID(uuid_t uuid) {
if (uuid) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index c32add6..e8e7462 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -35,7 +35,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string
if (nullptr == ptr) {
logger_->log_error("No Processor defined for %s", name.c_str());
}
- std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast < core::Processor > (ptr);
// initialize the processor
processor->initialize();
@@ -53,18 +53,16 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
return processor;
}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
- std::string name, uuid_t uuid, int version) {
- return std::unique_ptr<core::ProcessGroup>(
- new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) {
+ return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
}
std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
- return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+ return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
}
std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
- return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
+ return std::make_shared < minifi::Connection > (flow_file_repo_, content_repo_, name, uuid);
}
std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index d9057c5..6afd0fe 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -47,7 +47,7 @@ FlowFile::FlowFile()
entry_date_ = getTimeMillis();
lineage_start_date_ = entry_date_;
- char uuidStr[37];
+ char uuidStr[37] = { 0 };
// Generate the global UUID for the flow record
id_generator_->generate(uuid_);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 2cf3db0..db0fe08 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -39,8 +39,7 @@ namespace core {
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version,
- ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
@@ -55,7 +54,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
yield_period_msec_ = 0;
transmitting_ = false;
- logger_->log_info("ProcessGroup %s created", name_.c_str());
+ logger_->log_info("ProcessGroup %s created", name_);
}
ProcessGroup::~ProcessGroup() {
@@ -70,12 +69,12 @@ ProcessGroup::~ProcessGroup() {
}
bool ProcessGroup::isRootProcessGroup() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
return (type_ == ROOT_PROCESS_GROUP);
}
void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
@@ -85,7 +84,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
}
void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
@@ -95,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
}
void ProcessGroup::addProcessGroup(ProcessGroup *child) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
@@ -105,7 +104,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
}
void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
@@ -115,7 +114,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
}
void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
try {
// Start all the processor node, input and output ports
@@ -143,7 +142,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
}
void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
try {
// Stop all the processor node, input and output ports
@@ -169,7 +168,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_info("find processor %s", processor->getName().c_str());
@@ -209,7 +208,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName().c_str());
@@ -225,7 +224,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces
}
void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
for (auto processor : processors_) {
if (processor->getName() == processorName) {
processor->setProperty(propertyName, propertyValue);
@@ -247,7 +246,7 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti
}
void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
@@ -269,7 +268,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
}
void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard < std::recursive_mutex > lock(mutex_);
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index df21a34..c69b361 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -38,19 +38,21 @@ namespace core {
std::shared_ptr<core::FlowFile> ProcessSession::create() {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
_addedFlowFiles[record->getUUIDStr()] = record;
logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
- std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr();
- provenance_report_->create(record, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " creates flow record " << record->getUUIDStr();
+ provenance_report_->create(record, details.str());
return record;
}
std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
if (record) {
_addedFlowFiles[record->getUUIDStr()] = record;
@@ -92,7 +94,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow
std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
if (record) {
this->_clonedFlowFiles[record->getUUIDStr()] = record;
@@ -168,26 +170,30 @@ void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
flow->setAttribute(key, value);
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
- provenance_report_->modifyAttributes(flow, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+ provenance_report_->modifyAttributes(flow, details.str());
}
void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
flow->removeAttribute(key);
- std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
- provenance_report_->modifyAttributes(flow, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
+ provenance_report_->modifyAttributes(flow, details.str());
}
void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
flow->setAttribute(key, value);
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
- provenance_report_->modifyAttributes(flow, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
+ provenance_report_->modifyAttributes(flow, details.str());
}
void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
flow->removeAttribute(key);
- std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
- provenance_report_->modifyAttributes(flow, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key;
+ provenance_report_->modifyAttributes(flow, details.str());
}
void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
@@ -207,41 +213,41 @@ void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationsh
}
void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
- DEFAULT_CONTENT_DIRECTORY);
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
try {
- std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
- if (fs.is_open()) {
- // Call the callback to write the content
- callback->process(&fs);
- if (fs.good() && fs.tellp() >= 0) {
- flow->setSize(fs.tellp());
- flow->setOffset(0);
- std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
- if (flow_claim != nullptr) {
- // Remove the old claim
- flow_claim->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- flow->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- /*
- logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ claim->increaseFlowFileRecordOwnedCount();
+// fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ // Call the callback to write the content
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
+ }
+
+ flow->setSize(stream->getSize());
+ flow->setOffset(0);
+ std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+ if (flow_claim != nullptr) {
+ // Remove the old claim
+ flow_claim->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
}
+ flow->setResourceClaim(claim);
+
+ /*
+ logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ stream->closeStream();
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
if (flow && flow->getResourceClaim() == claim) {
flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -260,39 +266,34 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
}
void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
try {
- std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
- if (fs.is_open()) {
- // Call the callback to write the content
- callback->process(&fs);
- if (fs.good() && fs.tellp() >= 0) {
- flow->setSize(fs.tellp());
- flow->setOffset(0);
- std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
- if (flow_claim != nullptr) {
- // Remove the old claim
- flow_claim->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- flow->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- /*
- logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ claim->increaseFlowFileRecordOwnedCount();
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ // Call the callback to write the content
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
+ }
+ flow->setSize(stream->getSize());
+ flow->setOffset(0);
+ std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
+ if (flow_claim != nullptr) {
+ // Remove the old claim
+ flow_claim->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
}
+ flow->setResourceClaim(claim);
+
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
if (flow && flow->getResourceClaim() == claim) {
flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -321,30 +322,25 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStream
claim = flow->getResourceClaim();
try {
- std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
- if (fs.is_open()) {
- // Call the callback to write the content
- std::streampos oldPos = fs.tellp();
- callback->process(&fs);
- if (fs.good() && fs.tellp() >= 0) {
- uint64_t appendSize = fs.tellp() - oldPos;
- flow->setSize(flow->getSize() + appendSize);
- /*
- logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
- flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
- }
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ // Call the callback to write the content
+ size_t oldPos = stream->getSize();
+ stream->seek(oldPos + 1);
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
+ }
+ uint64_t appendSize = stream->getSize() - oldPos;
+ flow->setSize(flow->getSize() + appendSize);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -365,30 +361,26 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC
claim = flow->getResourceClaim();
try {
- std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
- if (fs.is_open()) {
- // Call the callback to write the content
- std::streampos oldPos = fs.tellp();
- callback->process(&fs);
- if (fs.good() && fs.tellp() >= 0) {
- uint64_t appendSize = fs.tellp() - oldPos;
- flow->setSize(flow->getSize() + appendSize);
- /*
- logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
- flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
- }
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ // Call the callback to write the content
+ size_t oldPos = stream->getSize();
+ stream->seek(oldPos + 1);
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
+ }
+ uint64_t appendSize = stream->getSize() - oldPos;
+ flow->setSize(flow->getSize() + appendSize);
+
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -408,23 +400,19 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCall
}
claim = flow->getResourceClaim();
- std::ifstream fs;
- fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
- if (fs.is_open()) {
- fs.seekg(flow->getOffset(), fs.beg);
-
- if (fs.good()) {
- callback->process(&fs);
- /*
- logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+
+ stream->seek(flow->getOffset());
+
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
@@ -445,23 +433,17 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal
}
claim = flow->getResourceClaim();
- std::ifstream fs;
- fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
- if (fs.is_open()) {
- fs.seekg(flow->getOffset(), fs.beg);
-
- if (fs.good()) {
- callback->process(&fs);
- /*
- logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
- fs.close();
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ stream->seek(flow->getOffset());
+
+ if (callback->process(stream) < 0) {
+ rollback();
+ return;
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
@@ -479,60 +461,55 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal
*
*/
void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
-
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
int max_read = getpagesize();
std::vector<uint8_t> charBuffer;
charBuffer.resize(max_read);
try {
- std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-
- if (fs.is_open()) {
- size_t position = 0;
- const size_t max_size = stream.getSize();
- size_t read_size = max_read;
- while (position < max_size) {
- if ((max_size - position) > max_read) {
- read_size = max_read;
- } else {
- read_size = max_size - position;
- }
- charBuffer.clear();
- stream.readData(charBuffer, read_size);
-
- fs.write((const char*) charBuffer.data(), read_size);
- position += read_size;
+ claim->increaseFlowFileRecordOwnedCount();
+ std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
+
+ if (nullptr == content_stream) {
+ logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
+ rollback();
+ return;
+ }
+ size_t position = 0;
+ const size_t max_size = stream.getSize();
+ size_t read_size = max_read;
+ while (position < max_size) {
+ if ((max_size - position) > max_read) {
+ read_size = max_read;
+ } else {
+ read_size = max_size - position;
}
- // Open the source file and stream to the flow file
+ charBuffer.clear();
+ stream.readData(charBuffer, read_size);
- if (fs.good() && fs.tellp() >= 0) {
- flow->setSize(fs.tellp());
- flow->setOffset(0);
- if (flow->getResourceClaim() != nullptr) {
- // Remove the old claim
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- flow->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
-
- logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ content_stream->write(charBuffer.data(), read_size);
+ position += read_size;
+ }
+ // Open the source file and stream to the flow file
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ flow->setSize(content_stream->getSize());
+ flow->setOffset(0);
+ if (flow->getResourceClaim() != nullptr) {
+ // Remove the old claim
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flow->clearResourceClaim();
}
+ flow->setResourceClaim(claim);
+
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
+
+ content_stream->closeStream();
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
if (flow && flow->getResourceClaim() == claim) {
flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -550,34 +527,44 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl
}
}
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow,
-bool keepSource,
- uint64_t offset) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
char *buf = NULL;
int size = 4096;
buf = new char[size];
try {
- std::ofstream fs;
+ // std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-
- if (fs.is_open() && input.is_open()) {
+ claim->increaseFlowFileRecordOwnedCount();
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ if (input.is_open()) {
// Open the source file and stream to the flow file
- input.seekg(offset, fs.beg);
+ input.seekg(offset);
+ bool invalidWrite = false;
while (input.good()) {
input.read(buf, size);
- if (input)
- fs.write(buf, size);
- else
- fs.write(buf, input.gcount());
+ if (input) {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ } else {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ }
}
- if (fs.good() && fs.tellp() >= 0) {
- flow->setSize(fs.tellp());
+ if (!invalidWrite) {
+ flow->setSize(stream->getSize());
flow->setOffset(0);
if (flow->getResourceClaim() != nullptr) {
// Remove the old claim
@@ -585,20 +572,20 @@ bool keepSource,
flow->clearResourceClaim();
}
flow->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
flow->getUUIDStr().c_str());
- fs.close();
+ stream->closeStream();
input.close();
if (!keepSource)
std::remove(source.c_str());
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} else {
- fs.close();
+ stream->closeStream();
input.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
@@ -626,8 +613,7 @@ bool keepSource,
}
}
-void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
- bool keepSource, uint64_t offset, char inputDelimiter) {
+void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, bool keepSource, uint64_t offset, char inputDelimiter) {
std::shared_ptr<ResourceClaim> claim;
std::shared_ptr<FlowFileRecord> flowFile;
@@ -639,48 +625,61 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
try {
// Open the input file and seek to the appropriate location.
std::ifstream input;
+ logger_->log_debug("Opening %s", source);
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
if (input.is_open()) {
input.seekg(offset, input.beg);
while (input.good()) {
+ bool invalidWrite = false;
flowFile = std::static_pointer_cast<FlowFileRecord>(create());
- claim = std::make_shared<ResourceClaim>();
+ claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
uint64_t startTime = getTimeMillis();
input.getline(buf, size, inputDelimiter);
- std::ofstream fs;
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-
- if (fs.is_open()) {
- if (input)
- fs.write(buf, strlen(buf));
- else
- fs.write(buf, input.gcount());
-
- if (fs.good() && fs.tellp() >= 0) {
- flowFile->setSize(fs.tellp());
- flowFile->setOffset(0);
- if (flowFile->getResourceClaim() != nullptr) {
- // Remove the old claim
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
- flowFile->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
- flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(),
- flowFile->getUUIDStr().c_str());
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ logger_->log_debug("Stream is null");
+ rollback();
+ return;
+ }
- fs.close();
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flowFile, details, endTime - startTime);
- flows.push_back(flowFile);
+ if (input) {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ } else {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ }
- } else {
- fs.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+ if (!invalidWrite) {
+ flowFile->setSize(stream->getSize());
+ flowFile->setOffset(0);
+ if (flowFile->getResourceClaim() != nullptr) {
+ // Remove the old claim
+ flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flowFile->clearResourceClaim();
}
+ flowFile->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
+ flowFile->getSize(),
+ flowFile->getResourceClaim()->getContentFullPath().c_str(),
+ flowFile->getUUIDStr().c_str());
+
+ stream->closeStream();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+ flows.push_back(flowFile);
+ } else {
+ logger_->log_debug("Error while writing");
+ stream->closeStream();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
}
}
input.close();
@@ -711,35 +710,44 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
}
}
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
-bool keepSource,
- uint64_t offset) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
-
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource, uint64_t offset) {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
char *buf = NULL;
int size = 4096;
buf = new char[size];
try {
- std::ofstream fs;
+ // std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-
- if (fs.is_open() && input.is_open()) {
+ claim->increaseFlowFileRecordOwnedCount();
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ if (nullptr == stream) {
+ rollback();
+ return;
+ }
+ if (input.is_open()) {
// Open the source file and stream to the flow file
- input.seekg(offset, fs.beg);
+ input.seekg(offset);
+ int sizeWritten = 0;
+ bool invalidWrite = false;
while (input.good()) {
input.read(buf, size);
- if (input)
- fs.write(buf, size);
- else
- fs.write(buf, input.gcount());
+ if (input) {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ } else {
+ if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+ invalidWrite = true;
+ break;
+ }
+ }
}
-
- if (fs.good() && fs.tellp() >= 0) {
- flow->setSize(fs.tellp());
+ if (!invalidWrite) {
+ flow->setSize(stream->getSize());
flow->setOffset(0);
if (flow->getResourceClaim() != nullptr) {
// Remove the old claim
@@ -747,20 +755,20 @@ bool keepSource,
flow->clearResourceClaim();
}
flow->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
flow->getUUIDStr().c_str());
- fs.close();
+ stream->closeStream();
input.close();
if (!keepSource)
std::remove(source.c_str());
- std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
+ provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} else {
- fs.close();
+ stream->closeStream();
input.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
@@ -834,7 +842,7 @@ void ProcessSession::commit() {
}
}
- // Do the samething for added flow file
+ // Do the same thing for added flow file
for (const auto it : _addedFlowFiles) {
std::shared_ptr<core::FlowFile> record = it.second;
if (record->isDeleted())
@@ -851,6 +859,7 @@ void ProcessSession::commit() {
std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
} else {
+ logger_->log_debug("added flow file is auto terminated");
// Autoterminated
remove(record);
}
@@ -947,7 +956,7 @@ void ProcessSession::rollback() {
_addedFlowFiles.clear();
_updatedFlowFiles.clear();
_deletedFlowFiles.clear();
- logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
+ logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -960,8 +969,10 @@ void ProcessSession::rollback() {
std::shared_ptr<core::FlowFile> ProcessSession::get() {
std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection();
- if (first == NULL)
+ if (first == NULL) {
+ logger_->log_debug("Get is null for %s", process_context_->getProcessorNode().getName());
return NULL;
+ }
std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first);
@@ -972,8 +983,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
// Remove expired flow record
for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) {
std::shared_ptr<core::FlowFile> record = *it;
- std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr();
- provenance_report_->expire(record, details);
+ std::stringstream details;
+ details << process_context_->getProcessorNode().getName() << " expire flow record " << record->getUUIDStr();
+ provenance_report_->expire(record, details.str());
}
}
if (ret) {
@@ -981,10 +993,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
ret->setDeleted(false);
_updatedFlowFiles[ret->getUUIDStr()] = ret;
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
snapshot = ret;
-// snapshot->duplicate(ret);
// save a snapshot
_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
return ret;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp
index 31b7481..570d895 100644
--- a/libminifi/src/core/ProcessSessionFactory.cpp
+++ b/libminifi/src/core/ProcessSessionFactory.cpp
@@ -28,7 +28,7 @@ namespace minifi {
namespace core {
std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() {
- return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_));
+ return std::unique_ptr < ProcessSession > (new ProcessSession(process_context_));
}
} /* namespace core */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7b07638..0c2e7cf 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
- logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str());
+ logger_->log_info("Processor %s created UUID %s", name_, uuidStr_);
}
bool Processor::isRunning() {
@@ -80,8 +80,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
return false;
}
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
- std::lock_guard<std::mutex> lock(mutex_);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
+ std::lock_guard < std::mutex > lock(mutex_);
uuid_t srcUUID;
uuid_t destUUID;
@@ -141,12 +141,12 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
return;
}
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
uuid_t srcUUID;
uuid_t destUUID;
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
@@ -178,13 +178,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
}
bool Processor::flowFilesQueued() {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
if (_incomingConnections.size() == 0)
return false;
for (auto &&conn : _incomingConnections) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
if (connection->getQueueSize() > 0)
return true;
}
@@ -193,13 +193,13 @@ bool Processor::flowFilesQueued() {
}
bool Processor::flowFilesOutGoingFull() {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
for (auto &&connection : out_going_connections_) {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
for (const auto conn : existedConnection) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ std::shared_ptr < Connection > connection = std::static_pointer_cast < Connection > (conn);
if (connection->isFull())
return true;
}
@@ -232,7 +232,7 @@ bool Processor::isWorkAvailable() {
try {
for (const auto &conn : _incomingConnections) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn);
if (connection->getQueueSize() > 0) {
hasWork = true;
break;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index 50e8cd2..cf26a0d 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -19,13 +19,14 @@
#include <arpa/inet.h>
#include <cstdint>
#include <vector>
+
+#include "../../include/core/repository/FlowFileRepository.h"
#include "io/DataStream.h"
#include "io/Serializable.h"
#include "core/Relationship.h"
#include "core/logging/Logger.h"
#include "FlowController.h"
#include "provenance/Provenance.h"
-#include "core/repository/FlowFileRepository.h"
namespace org {
namespace apache {
@@ -38,9 +39,8 @@ void Repository::start() {
return;
if (running_)
return;
- thread_ = std::thread(&Repository::threadExecutor, this);
- thread_.detach();
running_ = true;
+ thread_ = std::thread(&Repository::threadExecutor, this);
logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index cf18601..9e99718 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -18,13 +18,17 @@
#include <memory>
#include <string>
#include <algorithm>
+#include "core/ContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
+#include "core/repository/VolatileContentRepository.h"
#include "core/Repository.h"
#ifdef LEVELDB_SUPPORT
#include "core/repository/FlowFileRepository.h"
#include "provenance/ProvenanceRepository.h"
#endif
-#include "core/repository/VolatileRepository.h"
+#include "core/repository/VolatileProvenanceRepository.h"
+#include "core/repository/VolatileFlowFileRepository.h"
namespace org {
namespace apache {
@@ -48,14 +52,14 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
try {
std::shared_ptr<core::Repository> return_obj = nullptr;
if (class_name_lc == "flowfilerepository") {
- std::cout << "creating flow" << std::endl;
return_obj = instantiate<core::repository::FlowFileRepository>(repo_name);
} else if (class_name_lc == "provenancerepository") {
return_obj = instantiate<provenance::ProvenanceRepository>(repo_name);
- } else if (class_name_lc == "volatilerepository") {
- return_obj = instantiate<repository::VolatileRepository>(repo_name);
+ } else if (class_name_lc == "volatileflowfilerepository") {
+ return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
+ } else if (class_name_lc == "volatileprovenancefilerepository") {
+ return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
} else if (class_name_lc == "nooprepository") {
- std::cout << "creating noop" << std::endl;
return_obj = instantiate<core::Repository>(repo_name);
}
@@ -63,13 +67,42 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
return return_obj;
}
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+ return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
} else {
throw std::runtime_error("Support for the provided configuration class could not be found");
}
} catch (const std::runtime_error &r) {
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+ return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1);
+ }
+ }
+
+ throw std::runtime_error("Support for the provided configuration class could not be found");
+}
+
+std::shared_ptr<core::ContentRepository> createContentRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
+ std::shared_ptr<core::ContentRepository> return_obj = nullptr;
+ std::string class_name_lc = configuration_class_name;
+ std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
+ try {
+ std::shared_ptr<core::ContentRepository> return_obj = nullptr;
+ if (class_name_lc == "volatilecontentrepository") {
+ return_obj = instantiate<core::repository::VolatileContentRepository>(repo_name);
+ } else {
+ return_obj = instantiate<core::repository::FileSystemRepository>(repo_name);
+ }
+
+ if (return_obj) {
+ return return_obj;
+ }
+ if (fail_safe) {
+ return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
+ } else {
+ throw std::runtime_error("Support for the provided configuration class could not be found");
+ }
+ } catch (const std::runtime_error &r) {
+ if (fail_safe) {
+ return std::make_shared < core::repository::FileSystemRepository > ("fail_safe");
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 5c4aa70..69004c1 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -27,12 +27,12 @@ namespace minifi {
namespace core {
namespace controller {
std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
return process_group_;
}
void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
process_group_ = processGroup;
}
@@ -45,7 +45,7 @@ bool StandardControllerServiceNode::enable() {
for (auto linked_service : property.getValues()) {
std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service);
if (nullptr != csNode) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard < std::mutex > lock(mutex_);
linked_controller_services_.push_back(csNode);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index c06239b..4b97055 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -56,19 +56,19 @@ std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &t
LoggerConfiguration::LoggerConfiguration()
: root_namespace_(create_default_root()),
loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
- formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
- logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
+ formatter_(std::make_shared < spdlog::pattern_formatter > (spdlog_default_pattern)) {
+ logger_ = std::shared_ptr < LoggerImpl > (new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
loggers.push_back(logger_);
}
void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
- std::lock_guard<std::mutex> lock(mutex);
+ std::lock_guard < std::mutex > lock(mutex);
root_namespace_ = initialize_namespaces(logger_properties);
std::string spdlog_pattern;
if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) {
spdlog_pattern = spdlog_default_pattern;
}
- formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern);
+ formatter_ = std::make_shared < spdlog::pattern_formatter > (spdlog_pattern);
std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers;
for (auto const & logger_impl : loggers) {
std::shared_ptr<spdlog::logger> spdlogger;
@@ -85,8 +85,8 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo
}
std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) {
- std::lock_guard<std::mutex> lock(mutex);
- std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_));
+ std::lock_guard < std::mutex > lock(mutex);
+ std::shared_ptr<LoggerImpl> result = std::make_shared < LoggerImpl > (name, get_logger(logger_, root_namespace_, name, formatter_));
loggers.push_back(result);
return result;
}
@@ -130,7 +130,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names
} catch (const std::out_of_range &oor) {
}
}
- sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files);
+ sink_map[appender_name] = std::make_shared < spdlog::sinks::rotating_file_sink_mt > (file_name, max_file_size, max_files);
} else if ("stdout" == appender_type) {
sink_map[appender_name] = spdlog::sinks::stdout_sink_mt::instance();
} else {
@@ -227,7 +227,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
if (logger != nullptr) {
logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, spdlog::level::level_names[level], level_namespace_str);
}
- spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks));
+ spdlogger = std::make_shared < spdlog::logger > (name, begin(sinks), end(sinks));
spdlogger->set_level(level);
spdlogger->set_formatter(formatter);
spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level));
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 02ddb52..d4059d6 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -24,8 +24,10 @@
#include <string>
#include <memory>
#include <sstream>
+#include <functional>
#include <iostream>
#include <utility>
+#include "core/Repository.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "../include/io/StreamFactory.h"
#include "io/ClientSocket.h"
@@ -51,10 +53,14 @@ void SiteToSiteProvenanceReportingTask::initialize() {
RemoteProcessorGroupPort::initialize();
}
-void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<core::SerializableComponent>> &records,
std::string &report) {
Json::Value array;
- for (auto record : records) {
+ for (auto sercomp : records) {
+ std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast < provenance::ProvenanceEventRecord > (sercomp);
+ if (nullptr == record) {
+ break;
+ }
Json::Value recordJson;
Json::Value updatedAttributesJson;
Json::Value parentUuidJson;
@@ -108,23 +114,32 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
return;
}
+ logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger");
+
if (!protocol_->bootstrap()) {
// bootstrap the client protocol if needeed
context->yield();
- std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
+ std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor());
logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
returnProtocol(std::move(protocol_));
return;
}
- std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records;
- std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository());
- repo->getProvenanceRecord(records, batch_size_);
- if (records.size() <= 0) {
+ std::vector<std::shared_ptr<core::SerializableComponent>> records;
+
+ logger_->log_debug("batch size %d records", batch_size_);
+ size_t deserialized = batch_size_;
+ std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
+ std::function < std::shared_ptr<core::SerializableComponent>() > constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
+ if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
+ logger_->log_debug("Not sending because deserialized is %d", deserialized);
returnProtocol(std::move(protocol_));
return;
}
+ logger_->log_debug("batch size %d records", batch_size_, deserialized);
+
+ logger_->log_debug("Captured %d records", deserialized);
std::string jsonStr;
this->getJsonReport(context, session, records, jsonStr);
if (jsonStr.length() <= 0) {
@@ -141,7 +156,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
}
// we transfer the record, purge the record from DB
- repo->purgeProvenanceRecord(records);
+ repo->Delete(records);
returnProtocol(std::move(protocol_));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/src/core/repository/FileSystemRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
new file mode 100644
index 0000000..fba1fe3
--- /dev/null
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/repository/FileSystemRepository.h"
+#include <memory>
+#include "io/FileStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) {
+ return true;
+}
+void FileSystemRepository::stop() {
+}
+
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ return std::make_shared<io::FileStream>(claim->getContentFullPath());
+}
+
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false);
+}
+
+bool FileSystemRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ std::remove(claim->getContentFullPath().c_str());
+ return true;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */