You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:14 UTC
[09/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces
and removes use of raw pointers for user facing API.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index eca84be..e472a9a 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -30,765 +30,271 @@
#include <unistd.h>
#include <future>
#include "FlowController.h"
-#include "ProcessContext.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessGroup.h"
#include "utils/StringUtils.h"
+#include "core/core.h"
+#include "core/repository/FlowFileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+#define DEFAULT_CONFIG_NAME "conf/flow.yml"
+
+FlowController::FlowController(
+ std::shared_ptr<core::Repository> provenance_repo,
+ std::shared_ptr<core::Repository> flow_file_repo,
+ std::unique_ptr<core::FlowConfiguration> flow_configuration,
+ const std::string name, bool headless_mode)
+ : CoreComponent(core::getClassName<FlowController>()),
+ root_(nullptr),
+ max_timer_driven_threads_(0),
+ max_event_driven_threads_(0),
+ running_(false),
+ initialized_(false),
+ provenance_repo_(provenance_repo),
+ flow_file_repo_(flow_file_repo),
+ protocol_(0),
+ _timerScheduler(provenance_repo_),
+ _eventScheduler(provenance_repo_),
+ flow_configuration_(std::move(flow_configuration)) {
+ if (provenance_repo == nullptr)
+ throw std::runtime_error("Provenance Repo should not be null");
+ if (flow_file_repo == nullptr)
+ throw std::runtime_error("Flow Repo should not be null");
+
+ uuid_generate(uuid_);
+ setUUID(uuid_);
+
+ // Setup the default values
+ if (flow_configuration_ != nullptr) {
+ configuration_filename_ = flow_configuration_->getConfigurationPath();
+ }
+ max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+ max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+ running_ = false;
+ initialized_ = false;
+ root_ = NULL;
+
+ protocol_ = new FlowControlProtocol(this);
+
+ // NiFi config properties
+ configure_ = Configure::getConfigure();
+
+ if (!headless_mode) {
+ std::string rawConfigFileString;
+ configure_->get(Configure::nifi_flow_configuration_file,
+ rawConfigFileString);
+
+ if (!rawConfigFileString.empty()) {
+ configuration_filename_ = rawConfigFileString;
+ }
+
+ std::string adjustedFilename;
+ if (!configuration_filename_.empty()) {
+ // perform a naive determination if this is a relative path
+ if (configuration_filename_.c_str()[0] != '/') {
+ adjustedFilename = adjustedFilename + configure_->getHome() + "/"
+ + configuration_filename_;
+ } else {
+ adjustedFilename = configuration_filename_;
+ }
+ }
+
+ initializePaths(adjustedFilename);
+ }
-FlowController *FlowControllerFactory::_flowController(NULL);
-
-FlowControllerImpl::FlowControllerImpl(std::string name) {
- uuid_generate(_uuid);
-
- _name = name;
- // Setup the default values
- _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
- _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
- _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
- _running = false;
- _initialized = false;
- _root = NULL;
- logger_ = Logger::getLogger();
- _protocol = new FlowControlProtocol(this);
-
- // NiFi config properties
- configure_ = Configure::getConfigure();
-
- std::string rawConfigFileString;
- configure_->get(Configure::nifi_flow_configuration_file,
- rawConfigFileString);
-
- if (!rawConfigFileString.empty()) {
- _configurationFileName = rawConfigFileString;
- }
-
- char *path = NULL;
- char full_path[PATH_MAX];
-
- std::string adjustedFilename;
- if (!_configurationFileName.empty()) {
- // perform a naive determination if this is a relative path
- if (_configurationFileName.c_str()[0] != '/') {
- adjustedFilename = adjustedFilename + configure_->getHome() + "/"
- + _configurationFileName;
- } else {
- adjustedFilename = _configurationFileName;
- }
- }
-
- path = realpath(adjustedFilename.c_str(), full_path);
-
- std::string pathString(path);
- _configurationFileName = pathString;
- logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
-
- // Create the content repo directory if needed
- struct stat contentDirStat;
-
- if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode))
- {
- path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
- logger_->log_info("FlowController content directory %s", full_path);
- }
- else
- {
- if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1)
- {
- logger_->log_error("FlowController content directory creation failed");
- exit(1);
- }
- }
-
-
- std::string clientAuthStr;
-
- if (!path) {
- logger_->log_error(
- "Could not locate path from provided configuration file name (%s). Exiting.",
- full_path);
- exit(1);
- }
-
-
- // Create repos for flow record and provenance
- _flowfileRepo = new FlowFileRepository();
- _flowfileRepo->initialize();
- _provenanceRepo = new ProvenanceRepository();
- _provenanceRepo->initialize();
}
-FlowControllerImpl::~FlowControllerImpl() {
+void FlowController::initializePaths(const std::string &adjustedFilename) {
+ char *path = NULL;
+ char full_path[PATH_MAX];
+ path = realpath(adjustedFilename.c_str(), full_path);
+
+ if (path == NULL) {
+ throw std::runtime_error(
+ "Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
+ }
+ std::string pathString(path);
+ configuration_filename_ = pathString;
+ logger_->log_info("FlowController NiFi Configuration file %s",
+ pathString.c_str());
+
+ // Create the content repo directory if needed
+ struct stat contentDirStat;
+
+ if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat)
+ != -1&& S_ISDIR(contentDirStat.st_mode)) {
+ path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
+ logger_->log_info("FlowController content directory %s", full_path);
+ } else {
+ if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) {
+ logger_->log_error("FlowController content directory creation failed");
+ exit(1);
+ }
+ }
- stop(true);
- unload();
- if (NULL != _protocol)
- delete _protocol;
- if (NULL != _provenanceRepo)
- delete _provenanceRepo;
- if (NULL != _flowfileRepo)
- delete _flowfileRepo;
+ std::string clientAuthStr;
-}
+ if (!path) {
+ logger_->log_error(
+ "Could not locate path from provided configuration file name (%s). Exiting.",
+ full_path);
+ exit(1);
+ }
-void FlowControllerImpl::stop(bool force) {
+}
- if (_running) {
- // immediately indicate that we are not running
- _running = false;
+FlowController::~FlowController() {
+ stop(true);
+ unload();
+ if (NULL != protocol_)
+ delete protocol_;
- logger_->log_info("Stop Flow Controller");
- this->_timerScheduler.stop();
- this->_eventScheduler.stop();
- this->_flowfileRepo->stop();
- this->_provenanceRepo->stop();
- // Wait for sometime for thread stop
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- if (this->_root)
- this->_root->stopProcessing(&this->_timerScheduler,
- &this->_eventScheduler);
+}
- }
+void FlowController::stop(bool force) {
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ if (running_) {
+ // immediately indicate that we are not running
+ running_ = false;
+
+ logger_->log_info("Stop Flow Controller");
+ this->_timerScheduler.stop();
+ this->_eventScheduler.stop();
+ this->flow_file_repo_->stop();
+ this->provenance_repo_->stop();
+ // Wait for sometime for thread stop
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ if (this->root_)
+ this->root_->stopProcessing(&this->_timerScheduler,
+ &this->_eventScheduler);
+
+ }
}
/**
* This function will attempt to unload yaml and stop running Processors.
*
* If the latter attempt fails or does not complete within the prescribed
- * period, _running will be set to false and we will return.
+ * period, running_ will be set to false and we will return.
*
* @param timeToWaitMs Maximum time to wait before manually
* marking running as false.
*/
-void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) {
- if (_running) {
- // use the current time and increment with the provided argument.
- std::chrono::system_clock::time_point wait_time =
- std::chrono::system_clock::now()
- + std::chrono::milliseconds(timeToWaitMs);
-
- // create an asynchronous future.
- std::future<void> unload_task = std::async(std::launch::async,
- [this]() {unload();});
-
- if (std::future_status::ready == unload_task.wait_until(wait_time)) {
- _running = false;
- }
-
- }
-}
-
-
-void FlowControllerImpl::unload() {
- if (_running) {
- stop(true);
- }
- if (_initialized) {
- logger_->log_info("Unload Flow Controller");
- if (_root)
- delete _root;
- _root = NULL;
- _initialized = false;
- _name = "";
- }
-
- return;
-}
-
-Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) {
- Processor *processor = NULL;
- if (name == GenerateFlowFile::ProcessorName) {
- processor = new GenerateFlowFile(name, uuid);
- } else if (name == LogAttribute::ProcessorName) {
- processor = new LogAttribute(name, uuid);
- } else if (name == RealTimeDataCollector::ProcessorName) {
- processor = new RealTimeDataCollector(name, uuid);
- } else if (name == GetFile::ProcessorName) {
- processor = new GetFile(name, uuid);
- } else if (name == PutFile::ProcessorName) {
- processor = new PutFile(name, uuid);
- } else if (name == TailFile::ProcessorName) {
- processor = new TailFile(name, uuid);
- } else if (name == ListenSyslog::ProcessorName) {
- processor = new ListenSyslog(name, uuid);
- } else if (name == ListenHTTP::ProcessorName) {
- processor = new ListenHTTP(name, uuid);
- } else if (name == ExecuteProcess::ProcessorName) {
- processor = new ExecuteProcess(name, uuid);
- } else if (name == AppendHostInfo::ProcessorName) {
- processor = new AppendHostInfo(name, uuid);
- } else {
- logger_->log_error("No Processor defined for %s", name.c_str());
- return NULL;
- }
-
- //! initialize the processor
- processor->initialize();
-
- return processor;
-}
-
-ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name,
- uuid_t uuid) {
- return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
-}
-
-ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name,
- uuid_t uuid) {
- return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
-}
-
-Connection *FlowControllerImpl::createConnection(std::string name,
- uuid_t uuid) {
- return new Connection(name, uuid);
-}
-
-#ifdef YAML_SUPPORT
-void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
- uuid_t uuid;
- ProcessGroup *group = NULL;
-
- std::string flowName = rootFlowNode["name"].as<std::string>();
- std::string id = rootFlowNode["id"].as<std::string>();
-
- uuid_parse(id.c_str(), uuid);
-
- logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
- logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
- group = this->createRootProcessGroup(flowName, uuid);
- this->_root = group;
- this->_name = flowName;
-}
-
-void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
- ProcessGroup *parentGroup) {
- int64_t schedulingPeriod = -1;
- int64_t penalizationPeriod = -1;
- int64_t yieldPeriod = -1;
- int64_t runDurationNanos = -1;
- uuid_t uuid;
- Processor *processor = NULL;
-
- if (!parentGroup) {
- logger_->log_error("parseProcessNodeYaml: no parent group exists");
- return;
- }
-
- if (processorsNode) {
-
- if (processorsNode.IsSequence()) {
- // Evaluate sequence of processors
- int numProcessors = processorsNode.size();
-
- for (YAML::const_iterator iter = processorsNode.begin();
- iter != processorsNode.end(); ++iter) {
- ProcessorConfig procCfg;
- YAML::Node procNode = iter->as<YAML::Node>();
-
- procCfg.name = procNode["name"].as<std::string>();
- procCfg.id = procNode["id"].as<std::string>();
- logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
- procCfg.name.c_str(), procCfg.id.c_str());
- procCfg.javaClass = procNode["class"].as<std::string>();
- logger_->log_debug("parseProcessorNode: class => [%s]",
- procCfg.javaClass.c_str());
-
- uuid_parse(procCfg.id.c_str(), uuid);
-
- // Determine the processor name only from the Java class
- int lastOfIdx = procCfg.javaClass.find_last_of(".");
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- int nameLength = procCfg.javaClass.length() - lastOfIdx;
- std::string processorName = procCfg.javaClass.substr(
- lastOfIdx, nameLength);
- processor = this->createProcessor(processorName, uuid);
- }
-
- if (!processor) {
- logger_->log_error(
- "Could not create a processor %s with name %s",
- procCfg.name.c_str(), procCfg.id.c_str());
- throw std::invalid_argument(
- "Could not create processor " + procCfg.name);
- }
- processor->setName(procCfg.name);
-
- procCfg.maxConcurrentTasks =
- procNode["max concurrent tasks"].as<std::string>();
- logger_->log_debug(
- "parseProcessorNode: max concurrent tasks => [%s]",
- procCfg.maxConcurrentTasks.c_str());
- procCfg.schedulingStrategy = procNode["scheduling strategy"].as<
- std::string>();
- logger_->log_debug(
- "parseProcessorNode: scheduling strategy => [%s]",
- procCfg.schedulingStrategy.c_str());
- procCfg.schedulingPeriod = procNode["scheduling period"].as<
- std::string>();
- logger_->log_debug(
- "parseProcessorNode: scheduling period => [%s]",
- procCfg.schedulingPeriod.c_str());
- procCfg.penalizationPeriod = procNode["penalization period"].as<
- std::string>();
- logger_->log_debug(
- "parseProcessorNode: penalization period => [%s]",
- procCfg.penalizationPeriod.c_str());
- procCfg.yieldPeriod =
- procNode["yield period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: yield period => [%s]",
- procCfg.yieldPeriod.c_str());
- procCfg.yieldPeriod = procNode["run duration nanos"].as<
- std::string>();
- logger_->log_debug(
- "parseProcessorNode: run duration nanos => [%s]",
- procCfg.runDurationNanos.c_str());
-
- // handle auto-terminated relationships
- YAML::Node autoTerminatedSequence =
- procNode["auto-terminated relationships list"];
- std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence()
- && !autoTerminatedSequence.IsNull()
- && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter =
- autoTerminatedSequence.begin();
- relIter != autoTerminatedSequence.end();
- ++relIter) {
- std::string autoTerminatedRel =
- relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(
- autoTerminatedRel);
- }
- }
- procCfg.autoTerminatedRelationships =
- rawAutoTerminatedRelationshipValues;
-
- // handle processor properties
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, processor);
-
- // Take care of scheduling
- TimeUnit unit;
- if (Property::StringToTime(procCfg.schedulingPeriod,
- schedulingPeriod, unit)
- && Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
- schedulingPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: schedulingPeriod => [%d] ns",
- schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
- }
-
- if (Property::StringToTime(procCfg.penalizationPeriod,
- penalizationPeriod, unit)
- && Property::ConvertTimeUnitToMS(penalizationPeriod,
- unit, penalizationPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: penalizationPeriod => [%d] ms",
- penalizationPeriod);
- processor->setPenalizationPeriodMsec(penalizationPeriod);
- }
-
- if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod,
- unit)
- && Property::ConvertTimeUnitToMS(yieldPeriod, unit,
- yieldPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: yieldPeriod => [%d] ms",
- yieldPeriod);
- processor->setYieldPeriodMsec(yieldPeriod);
- }
-
- // Default to running
- processor->setScheduledState(RUNNING);
-
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
- processor->setSchedulingStrategy(TIMER_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy.c_str());
- } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- processor->setSchedulingStrategy(EVENT_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy.c_str());
- } else {
- processor->setSchedulingStrategy(CRON_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy.c_str());
-
- }
-
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(procCfg.maxConcurrentTasks,
- maxConcurrentTasks)) {
- logger_->log_debug(
- "parseProcessorNode: maxConcurrentTasks => [%d]",
- maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
-
- if (Property::StringToInt(procCfg.runDurationNanos,
- runDurationNanos)) {
- logger_->log_debug(
- "parseProcessorNode: runDurationNanos => [%d]",
- runDurationNanos);
- processor->setRunDurationNano(runDurationNanos);
- }
-
- std::set<Relationship> autoTerminatedRelationships;
- for (auto &&relString : procCfg.autoTerminatedRelationships) {
- Relationship relationship(relString, "");
- logger_->log_debug(
- "parseProcessorNode: autoTerminatedRelationship => [%s]",
- relString.c_str());
- autoTerminatedRelationships.insert(relationship);
- }
-
- processor->setAutoTerminatedRelationships(
- autoTerminatedRelationships);
-
- parentGroup->addProcessor(processor);
- }
- }
- } else {
- throw new std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
- }
-}
-
-void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
- ProcessGroup *parentGroup) {
- uuid_t uuid;
-
- if (!parentGroup) {
- logger_->log_error(
- "parseRemoteProcessGroupYaml: no parent group exists");
- return;
- }
-
- if (rpgNode) {
- if (rpgNode->IsSequence()) {
- for (YAML::const_iterator iter = rpgNode->begin();
- iter != rpgNode->end(); ++iter) {
- YAML::Node rpgNode = iter->as<YAML::Node>();
-
- auto name = rpgNode["name"].as<std::string>();
- auto id = rpgNode["id"].as<std::string>();
-
- logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
- name.c_str(), id.c_str());
-
- std::string url = rpgNode["url"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
- url.c_str());
-
- std::string timeout = rpgNode["timeout"].as<std::string>();
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: timeout => [%s]",
- timeout.c_str());
-
- std::string yieldPeriod =
- rpgNode["yield period"].as<std::string>();
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: yield period => [%s]",
- yieldPeriod.c_str());
-
- YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
- YAML::Node outputPorts =
- rpgNode["Output Ports"].as<YAML::Node>();
- ProcessGroup *group = NULL;
-
- uuid_parse(id.c_str(), uuid);
-
- int64_t timeoutValue = -1;
- int64_t yieldPeriodValue = -1;
-
- group = this->createRemoteProcessGroup(name.c_str(), uuid);
- group->setParent(parentGroup);
- parentGroup->addProcessGroup(group);
-
- TimeUnit unit;
-
- if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
- yieldPeriodValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
- yieldPeriodValue);
- group->setYieldPeriodMsec(yieldPeriodValue);
- }
-
- if (Property::StringToTime(timeout, timeoutValue, unit)
- && Property::ConvertTimeUnitToMS(timeoutValue, unit,
- timeoutValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
- timeoutValue);
- group->setTimeOut(timeoutValue);
- }
-
- group->setTransmitting(true);
- group->setURL(url);
-
- if (inputPorts && inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin();
- portIter != inputPorts.end(); ++portIter) {
- logger_->log_debug("Got a current port, iterating...");
-
- YAML::Node currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(&currPort, group, SEND);
- } // for node
- }
- if (outputPorts && outputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = outputPorts.begin();
- portIter != outputPorts.end(); ++portIter) {
- logger_->log_debug("Got a current port, iterating...");
-
- YAML::Node currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(&currPort, group, RECEIVE);
- } // for node
- }
-
- }
- }
- }
-}
-
-void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
- ProcessGroup *parent) {
- uuid_t uuid;
- Connection *connection = NULL;
-
- if (!parent) {
- logger_->log_error("parseProcessNode: no parent group was provided");
- return;
- }
-
- if (connectionsNode) {
-
- if (connectionsNode->IsSequence()) {
- for (YAML::const_iterator iter = connectionsNode->begin();
- iter != connectionsNode->end(); ++iter) {
-
- YAML::Node connectionNode = iter->as<YAML::Node>();
-
- std::string name = connectionNode["name"].as<std::string>();
- std::string id = connectionNode["id"].as<std::string>();
- std::string destId = connectionNode["destination id"].as<
- std::string>();
-
- uuid_parse(id.c_str(), uuid);
-
- logger_->log_debug(
- "Created connection with UUID %s and name %s", id.c_str(),
- name.c_str());
- connection = this->createConnection(name, uuid);
- auto rawRelationship =
- connectionNode["source relationship name"].as<
- std::string>();
- Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]",
- rawRelationship.c_str());
- if (connection)
- connection->setRelationship(relationship);
- std::string connectionSrcProcId =
- connectionNode["source id"].as<std::string>();
- uuid_t srcUUID;
- uuid_parse(connectionSrcProcId.c_str(), srcUUID);
-
- Processor *srcProcessor = this->_root->findProcessor(
- srcUUID);
-
- if (!srcProcessor) {
- logger_->log_error(
- "Could not locate a source with id %s to create a connection",
- connectionSrcProcId.c_str());
- throw std::invalid_argument(
- "Could not locate a source with id %s to create a connection "
- + connectionSrcProcId);
- }
-
- uuid_t destUUID;
- uuid_parse(destId.c_str(), destUUID);
- Processor *destProcessor = this->_root->findProcessor(destUUID);
- // If we could not find name, try by UUID
- if (!destProcessor) {
- uuid_t destUuid;
- uuid_parse(destId.c_str(), destUuid);
- destProcessor = this->_root->findProcessor(destUuid);
- }
- if (destProcessor) {
- std::string destUuid = destProcessor->getUUIDStr();
- }
-
- uuid_t srcUuid;
- uuid_t destUuid;
- srcProcessor->getUUID(srcUuid);
- connection->setSourceProcessorUUID(srcUuid);
- destProcessor->getUUID(destUuid);
- connection->setDestinationProcessorUUID(destUuid);
-
- if (connection) {
- parent->addConnection(connection);
- }
- }
- }
-
- if (connection)
- parent->addConnection(connection);
-
- return;
- }
-}
-
-void FlowControllerImpl::parsePortYaml(YAML::Node *portNode,
- ProcessGroup *parent, TransferDirection direction) {
- uuid_t uuid;
- Processor *processor = NULL;
- RemoteProcessorGroupPort *port = NULL;
-
- if (!parent) {
- logger_->log_error("parseProcessNode: no parent group existed");
- return;
- }
-
- YAML::Node inputPortsObj = portNode->as<YAML::Node>();
-
- // generate the random UIID
- uuid_generate(uuid);
-
- auto portId = inputPortsObj["id"].as<std::string>();
- auto nameStr = inputPortsObj["name"].as<std::string>();
- uuid_parse(portId.c_str(), uuid);
-
- port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
-
- processor = (Processor *) port;
- port->setDirection(direction);
- port->setTimeOut(parent->getTimeOut());
- port->setTransmitting(true);
- processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
- processor->initialize();
-
- // handle port properties
- YAML::Node nodeVal = portNode->as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["Properties"];
-
- parsePropertiesNodeYaml(&propertiesNode, processor);
-
- // add processor to parent
- parent->addProcessor(processor);
- processor->setScheduledState(RUNNING);
- auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<
- std::string>();
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
- maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
+void FlowController::waitUnload(const uint64_t timeToWaitMs) {
+ if (running_) {
+ // use the current time and increment with the provided argument.
+ std::chrono::system_clock::time_point wait_time =
+ std::chrono::system_clock::now()
+ + std::chrono::milliseconds(timeToWaitMs);
+
+ // create an asynchronous future.
+ std::future<void> unload_task = std::async(std::launch::async,
+ [this]() {unload();});
+
+ if (std::future_status::ready == unload_task.wait_until(wait_time)) {
+ running_ = false;
+ }
+ }
}
-void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
- Processor *processor) {
- // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
- for (YAML::const_iterator propsIter = propertiesNode->begin();
- propsIter != propertiesNode->end(); ++propsIter) {
- std::string propertyName = propsIter->first.as<std::string>();
- YAML::Node propertyValueNode = propsIter->second;
- if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
- std::string rawValueString = propertyValueNode.as<std::string>();
- if (!processor->setProperty(propertyName, rawValueString)) {
- logger_->log_warn(
- "Received property %s with value %s but is not one of the properties for %s",
- propertyName.c_str(), rawValueString.c_str(),
- processor->getName().c_str());
- }
- }
- }
+void FlowController::unload() {
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ if (running_) {
+ stop(true);
+ }
+ if (initialized_) {
+ logger_->log_info("Unload Flow Controller");
+ root_ = nullptr;
+ initialized_ = false;
+ name_ = "";
+ }
+
+ return;
}
-#endif /* ifdef YAML_SUPPORT */
-void FlowControllerImpl::load() {
- if (_running) {
- stop(true);
- }
- if (!_initialized) {
- logger_->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
-
-#ifdef YAML_SUPPORT
- YAML::Node flow = YAML::LoadFile(_configurationFileName);
-
- YAML::Node flowControllerNode = flow["Flow Controller"];
- YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = flow["Connections"];
- YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
+void FlowController::load() {
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ if (running_) {
+ stop(true);
+ }
+ if (!initialized_) {
+ logger_->log_info("Load Flow Controller from file %s",
+ configuration_filename_.c_str());
- // Create the root process group
- parseRootProcessGroupYaml(flowControllerNode);
- parseProcessorNodeYaml(processorsNode, this->_root);
- parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
- parseConnectionYaml(&connectionsNode, this->_root);
+ this->root_ = flow_configuration_->getRoot(configuration_filename_);
- // Load Flow File from Repo
- loadFlowRepo();
-#endif
+ // Load Flow File from Repo
+ loadFlowRepo();
- _initialized = true;
- }
+ initialized_ = true;
+ }
}
-void FlowControllerImpl::loadFlowRepo()
-{
- if (this->_flowfileRepo && this->_flowfileRepo->isEnable())
- {
- std::map<std::string, Connection *> connectionMap;
- this->_root->getConnections(&connectionMap);
- this->_flowfileRepo->loadFlowFileToConnections(&connectionMap);
- }
-}
-
-void FlowControllerImpl::reload(std::string yamlFile)
-{
- logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
+void FlowController::reload(std::string yamlFile) {
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ logger_->log_info("Starting to reload Flow Controller with yaml %s",
+ yamlFile.c_str());
+ stop(true);
+ unload();
+ std::string oldYamlFile = this->configuration_filename_;
+ this->configuration_filename_ = yamlFile;
+ load();
+ start();
+ if (this->root_ != nullptr) {
+ this->configuration_filename_ = oldYamlFile;
+ logger_->log_info("Rollback Flow Controller to YAML %s",
+ oldYamlFile.c_str());
stop(true);
unload();
- std::string oldYamlFile = this->_configurationFileName;
- this->_configurationFileName = yamlFile;
load();
start();
- if (!this->_root)
- {
- this->_configurationFileName = oldYamlFile;
- logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str());
- stop(true);
- unload();
- load();
- start();
+ }
+}
+
+void FlowController::loadFlowRepo() {
+ if (this->flow_file_repo_) {
+ std::map<std::string, std::shared_ptr<Connection>> connectionMap;
+ if (this->root_ != nullptr) {
+ this->root_->getConnections(connectionMap);
}
+ auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
+ flow_file_repo_);
+ rep->loadFlowFileToConnections(connectionMap);
+ }
}
-bool FlowControllerImpl::start() {
- if (!_initialized) {
- logger_->log_error(
- "Can not start Flow Controller because it has not been initialized");
- return false;
- } else {
-
- if (!_running) {
- logger_->log_info("Starting Flow Controller");
- this->_timerScheduler.start();
- this->_eventScheduler.start();
- if (this->_root)
- this->_root->startProcessing(&this->_timerScheduler,
- &this->_eventScheduler);
- _running = true;
- this->_protocol->start();
- this->_provenanceRepo->start();
- this->_flowfileRepo->start();
- logger_->log_info("Started Flow Controller");
- }
- return true;
- }
+bool FlowController::start() {
+ std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+ if (!initialized_) {
+ logger_->log_error(
+ "Can not start Flow Controller because it has not been initialized");
+ return false;
+ } else {
+
+ if (!running_) {
+ logger_->log_info("Starting Flow Controller");
+ this->_timerScheduler.start();
+ this->_eventScheduler.start();
+ if (this->root_ != nullptr) {
+ this->root_->startProcessing(&this->_timerScheduler,
+ &this->_eventScheduler);
+ }
+ running_ = true;
+ this->protocol_->start();
+ this->provenance_repo_->start();
+ this->flow_file_repo_->start();
+ logger_->log_info("Started Flow Controller");
+ }
+ return true;
+ }
}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index a2f2323..7383574 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -27,253 +27,339 @@
#include <cstdio>
#include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
-
-FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim)
-: _size(0),
- _id(_localFlowSeqNumber.load()),
- _offset(0),
- _penaltyExpirationMs(0),
- _claim(claim),
- _isStoredToRepo(false),
- _markedDelete(false),
- _connection(NULL),
- _orginalConnection(NULL)
-{
- _entryDate = getTimeMillis();
- _lineageStartDate = _entryDate;
-
- char uuidStr[37];
-
- // Generate the global UUID for the flow record
- uuid_generate(_uuid);
- // Increase the local ID for the flow record
- ++_localFlowSeqNumber;
- uuid_unparse_lower(_uuid, uuidStr);
- _uuidStr = uuidStr;
-
- // Populate the default attributes
- addAttribute(FILENAME, std::to_string(getTimeNano()));
- addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
- addAttribute(UUID, uuidStr);
- // Populate the attributes from the input
- std::map<std::string, std::string>::iterator it;
- for (it = attributes.begin(); it!= attributes.end(); it++)
- {
- addAttribute(it->first, it->second);
- }
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+#include "core/Repository.h"
- _snapshot = false;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
- if (_claim)
- // Increase the flow file record owned count for the resource claim
- _claim->increaseFlowFileRecordOwnedCount();
- logger_ = Logger::getLogger();
-}
+std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0);
-FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
-: _size(0),
- _id(_localFlowSeqNumber.load()),
- _offset(0),
- _penaltyExpirationMs(0),
- _claim(NULL),
- _isStoredToRepo(false),
- _markedDelete(false),
- _connection(NULL),
- _orginalConnection(NULL)
-{
- _entryDate = event->getFlowFileEntryDate();
- _lineageStartDate = event->getlineageStartDate();
- _size = event->getFileSize();
- _offset = event->getFileOffset();
- _lineageIdentifiers = event->getLineageIdentifiers();
- _attributes = event->getAttributes();
- _snapshot = false;
- _uuidStr = event->getFlowFileUuid();
- uuid_parse(_uuidStr.c_str(), _uuid);
-
- if (_size > 0)
- {
- _claim = new ResourceClaim();
- }
+FlowFileRecord::FlowFileRecord(
+ std::shared_ptr<core::Repository> flow_repository,
+ std::map<std::string, std::string> attributes,
+ std::shared_ptr<ResourceClaim> claim)
+ : FlowFile(),
+ flow_repository_(flow_repository) {
+
+ id_ = local_flow_seq_number_.load();
+ claim_ = claim;
+ // Increase the local ID for the flow record
+ ++local_flow_seq_number_;
+ // Populate the default attributes
+ addKeyedAttribute(FILENAME, std::to_string(getTimeNano()));
+ addKeyedAttribute(PATH, DEFAULT_FLOWFILE_PATH);
+ addKeyedAttribute(UUID, getUUIDStr());
+ // Populate the attributes from the input
+ std::map<std::string, std::string>::iterator it;
+ for (it = attributes.begin(); it != attributes.end(); it++) {
+ FlowFile::addAttribute(it->first, it->second);
+ }
+
+ snapshot_ = false;
- if (_claim)
- {
- _claim->setContentFullPath(event->getContentFullPath());
- // Increase the flow file record owned count for the resource claim
- _claim->increaseFlowFileRecordOwnedCount();
+ if (claim_ != nullptr)
+ // Increase the flow file record owned count for the resource claim
+ claim_->increaseFlowFileRecordOwnedCount();
+ logger_ = logging::Logger::getLogger();
+}
+
+FlowFileRecord::FlowFileRecord(
+ std::shared_ptr<core::Repository> flow_repository,
+ std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection)
+ : FlowFile(),
+ snapshot_(""),
+ flow_repository_(flow_repository) {
+ entry_date_ = event->getEntryDate();
+ lineage_start_date_ = event->getlineageStartDate();
+ lineage_Identifiers_ = event->getlineageIdentifiers();
+ uuid_str_ = event->getUUIDStr();
+ attributes_ = event->getAttributes();
+ size_ = event->getSize();
+ offset_ = event->getOffset();
+ event->getUUID(uuid_);
+ uuid_connection_ = uuidConnection;
+ if (event->getResourceClaim()) {
+ content_full_fath_ = event->getResourceClaim()->getContentFullPath();
}
- logger_ = Logger::getLogger();
- ++_localFlowSeqNumber;
}
+FlowFileRecord::FlowFileRecord(
+ std::shared_ptr<core::Repository> flow_repository,
+ std::shared_ptr<core::FlowFile> &event)
+ : FlowFile(),
+ uuid_connection_(""),
+ snapshot_(""),
+ flow_repository_(flow_repository) {
-FlowFileRecord::~FlowFileRecord()
-{
- if (!_snapshot)
- logger_->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
- else
- logger_->log_debug("Delete SnapShot FlowFile UUID %s", _uuidStr.c_str());
- if (_claim)
- {
- // Decrease the flow file record owned count for the resource claim
- _claim->decreaseFlowFileRecordOwnedCount();
- if (_claim->getFlowFileRecordOwnedCount() <= 0)
- {
- logger_->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
- std::string value;
- if (!FlowControllerFactory::getFlowController()->getFlowFileRepository() ||
- !FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() ||
- !this->_isStoredToRepo ||
- !FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr, value))
- {
- // if it is persistent to DB already while it is in the queue, we keep the content
- std::remove(_claim->getContentFullPath().c_str());
- }
- delete _claim;
- }
- }
}
-bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
-{
- const char *keyStr = FlowAttributeKey(key);
- if (keyStr)
- {
- std::string keyString = keyStr;
- return addAttribute(keyString, value);
- }
- else
- {
- return false;
- }
+FlowFileRecord::~FlowFileRecord() {
+ if (!snapshot_)
+ logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str());
+ else
+ logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str());
+ if (claim_) {
+ // Decrease the flow file record owned count for the resource claim
+ claim_->decreaseFlowFileRecordOwnedCount();
+ std::string value;
+ if (claim_->getFlowFileRecordOwnedCount() <= 0) {
+ logger_->log_debug("Delete Resource Claim %s",
+ claim_->getContentFullPath().c_str());
+ if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
+ std::remove(claim_->getContentFullPath().c_str());
+ }
+ }
+ }
}
-bool FlowFileRecord::addAttribute(std::string key, std::string value)
-{
- std::map<std::string, std::string>::iterator it = _attributes.find(key);
- if (it != _attributes.end())
- {
- // attribute already there in the map
- return false;
- }
- else
- {
- _attributes[key] = value;
- return true;
- }
+bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) {
+ const char *keyStr = FlowAttributeKey(key);
+ if (keyStr) {
+ const std::string keyString = keyStr;
+ return FlowFile::addAttribute(keyString, value);
+ } else {
+ return false;
+ }
}
-bool FlowFileRecord::removeAttribute(FlowAttribute key)
-{
- const char *keyStr = FlowAttributeKey(key);
- if (keyStr)
- {
- std::string keyString = keyStr;
- return removeAttribute(keyString);
- }
- else
- {
- return false;
- }
+bool FlowFileRecord::removeKeyedAttribute(FlowAttribute key) {
+ const char *keyStr = FlowAttributeKey(key);
+ if (keyStr) {
+ std::string keyString = keyStr;
+ return FlowFile::removeAttribute(keyString);
+ } else {
+ return false;
+ }
}
-bool FlowFileRecord::removeAttribute(std::string key)
-{
- std::map<std::string, std::string>::iterator it = _attributes.find(key);
- if (it != _attributes.end())
- {
- _attributes.erase(key);
- return true;
- }
- else
- {
- return false;
- }
+bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key,
+ std::string value) {
+ const char *keyStr = FlowAttributeKey(key);
+ if (keyStr) {
+ std::string keyString = keyStr;
+ return FlowFile::updateAttribute(keyString, value);
+ } else {
+ return false;
+ }
}
-bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
-{
- const char *keyStr = FlowAttributeKey(key);
- if (keyStr)
- {
- std::string keyString = keyStr;
- return updateAttribute(keyString, value);
- }
- else
- {
- return false;
- }
+bool FlowFileRecord::getKeyedAttribute(FlowAttribute key, std::string &value) {
+ const char *keyStr = FlowAttributeKey(key);
+ if (keyStr) {
+ std::string keyString = keyStr;
+ return FlowFile::getAttribute(keyString, value);
+ } else {
+ return false;
+ }
}
-bool FlowFileRecord::updateAttribute(std::string key, std::string value)
-{
- std::map<std::string, std::string>::iterator it = _attributes.find(key);
- if (it != _attributes.end())
- {
- _attributes[key] = value;
- return true;
- }
- else
- {
- return false;
- }
+FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) {
+ core::FlowFile::operator=(other);
+ uuid_connection_ = other.uuid_connection_;
+ content_full_fath_ = other.content_full_fath_;
+ snapshot_ = other.snapshot_;
+ return *this;
}
-bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
-{
- const char *keyStr = FlowAttributeKey(key);
- if (keyStr)
- {
- std::string keyString = keyStr;
- return getAttribute(keyString, value);
- }
- else
- {
- return false;
- }
+bool FlowFileRecord::DeSerialize(std::string key) {
+ std::string value;
+ bool ret;
+
+ ret = flow_repository_->Get(key, value);
+
+ if (!ret) {
+ logger_->log_error("NiFi FlowFile Store event %s can not found",
+ key.c_str());
+ return false;
+ } else
+ logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(),
+ value.length());
+
+ io::DataStream stream((const uint8_t*) value.data(), value.length());
+
+ ret = DeSerialize(stream);
+
+ if (ret) {
+ logger_->log_debug(
+ "NiFi FlowFile retrieve uuid %s size %d connection %s success",
+ uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+ } else {
+ logger_->log_debug(
+ "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
+ uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+ }
+
+ return ret;
}
-bool FlowFileRecord::getAttribute(std::string key, std::string &value)
-{
- std::map<std::string, std::string>::iterator it = _attributes.find(key);
- if (it != _attributes.end())
- {
- value = it->second;
- return true;
- }
- else
- {
- return false;
- }
+bool FlowFileRecord::Serialize() {
+
+ io::DataStream outStream;
+
+ int ret;
+
+ ret = write(this->event_time_, &outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = write(this->entry_date_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = write(this->lineage_start_date_, &outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = writeUTF(this->uuid_str_, &outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+
+ ret = writeUTF(this->uuid_connection_, &outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+ // write flow attributes
+ uint32_t numAttributes = this->attributes_.size();
+ ret = write(numAttributes, &outStream);
+ if (ret != 4) {
+
+ return false;
+ }
+
+ for (auto itAttribute : attributes_) {
+ ret = writeUTF(itAttribute.first, &outStream, true);
+ if (ret <= 0) {
+
+ return false;
+ }
+ ret = writeUTF(itAttribute.second, &outStream, true);
+ if (ret <= 0) {
+
+ return false;
+ }
+ }
+
+ ret = writeUTF(this->content_full_fath_, &outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+
+ ret = write(this->size_, &outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = write(this->offset_, &outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ // Persistent to the DB
+
+
+ if (flow_repository_->Put(uuid_str_,
+ const_cast<uint8_t*>(outStream.getBuffer()),
+ outStream.getSize())) {
+ logger_->log_debug("NiFi FlowFile Store event %s size %d success",
+ uuid_str_.c_str(), outStream.getSize());
+ return true;
+ } else {
+ logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+ uuid_str_.c_str(), outStream.getSize());
+ return false;
+ }
+
+ // cleanup
+
+ return true;
}
-void FlowFileRecord::duplicate(FlowFileRecord *original)
-{
- uuid_copy(this->_uuid, original->_uuid);
- this->_attributes = original->_attributes;
- this->_entryDate = original->_entryDate;
- this->_id = original->_id;
- this->_lastQueueDate = original->_lastQueueDate;
- this->_lineageStartDate = original->_lineageStartDate;
- this->_offset = original->_offset;
- this->_penaltyExpirationMs = original->_penaltyExpirationMs;
- this->_size = original->_size;
- this->_lineageIdentifiers = original->_lineageIdentifiers;
- this->_orginalConnection = original->_orginalConnection;
- this->_uuidStr = original->_uuidStr;
- this->_connection = original->_connection;
- this->_markedDelete = original->_markedDelete;
-
- this->_claim = original->_claim;
- if (this->_claim)
- this->_claim->increaseFlowFileRecordOwnedCount();
-
- this->_snapshot = true;
+bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+
+ int ret;
+
+ io::DataStream outStream(buffer, bufferSize);
+
+ ret = read(this->event_time_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->entry_date_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->lineage_start_date_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = readUTF(this->uuid_str_, &outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ ret = readUTF(this->uuid_connection_, &outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ // read flow attributes
+ uint32_t numAttributes = 0;
+ ret = read(numAttributes, &outStream);
+ if (ret != 4) {
+ return false;
+ }
+
+ for (uint32_t i = 0; i < numAttributes; i++) {
+ std::string key;
+ ret = readUTF(key, &outStream, true);
+ if (ret <= 0) {
+ return false;
+ }
+ std::string value;
+ ret = readUTF(value, &outStream, true);
+ if (ret <= 0) {
+ return false;
+ }
+ this->attributes_[key] = value;
+ }
+
+ ret = readUTF(this->content_full_fath_, &outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ ret = read(this->size_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->offset_, &outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ return true;
}
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRepository.cpp b/libminifi/src/FlowFileRepository.cpp
deleted file mode 100644
index 3388738..0000000
--- a/libminifi/src/FlowFileRepository.cpp
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * @file FlowFileRepository.cpp
- * FlowFile implemenatation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-//! DeSerialize
-bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo,
- std::string key) {
- std::string value;
- bool ret;
-
- ret = repo->Get(key, value);
-
- if (!ret) {
- logger_->log_error("NiFi FlowFile Store event %s can not found",
- key.c_str());
- return false;
- } else
- logger_->log_debug("NiFi FlowFile Read event %s length %d",
- key.c_str(), value.length());
-
-
- DataStream stream((const uint8_t*)value.data(),value.length());
-
- ret = DeSerialize(stream);
-
- if (ret) {
- logger_->log_debug(
- "NiFi FlowFile retrieve uuid %s size %d connection %s success",
- _uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
- } else {
- logger_->log_debug(
- "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
- _uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
- }
-
- return ret;
-}
-
-bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) {
-
- DataStream outStream;
-
- int ret;
-
- ret = write(this->_eventTime,&outStream);
- if (ret != 8) {
-
- return false;
- }
-
- ret = write(this->_entryDate,&outStream);
- if (ret != 8) {
- return false;
- }
-
- ret = write(this->_lineageStartDate,&outStream);
- if (ret != 8) {
-
- return false;
- }
-
- ret = writeUTF(this->_uuid,&outStream);
- if (ret <= 0) {
-
- return false;
- }
-
- ret = writeUTF(this->_uuidConnection,&outStream);
- if (ret <= 0) {
-
- return false;
- }
-
- // write flow attributes
- uint32_t numAttributes = this->_attributes.size();
- ret = write(numAttributes,&outStream);
- if (ret != 4) {
-
- return false;
- }
-
- for (auto itAttribute : _attributes) {
- ret = writeUTF(itAttribute.first,&outStream, true);
- if (ret <= 0) {
-
- return false;
- }
- ret = writeUTF(itAttribute.second,&outStream, true);
- if (ret <= 0) {
-
- return false;
- }
- }
-
- ret = writeUTF(this->_contentFullPath,&outStream);
- if (ret <= 0) {
-
- return false;
- }
-
- ret = write(this->_size,&outStream);
- if (ret != 8) {
-
- return false;
- }
-
- ret = write(this->_offset,&outStream);
- if (ret != 8) {
-
- return false;
- }
-
- // Persistent to the DB
-
- if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
- logger_->log_debug("NiFi FlowFile Store event %s size %d success",
- _uuid.c_str(), outStream.getSize());
- return true;
- } else {
- logger_->log_error("NiFi FlowFile Store event %s size %d fail",
- _uuid.c_str(), outStream.getSize());
- return false;
- }
-
- // cleanup
-
- return true;
-}
-
-bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
-
- int ret;
-
- DataStream outStream(buffer,bufferSize);
-
- ret = read(this->_eventTime,&outStream);
- if (ret != 8) {
- return false;
- }
-
- ret = read(this->_entryDate,&outStream);
- if (ret != 8) {
- return false;
- }
-
- ret = read(this->_lineageStartDate,&outStream);
- if (ret != 8) {
- return false;
- }
-
- ret = readUTF(this->_uuid,&outStream);
- if (ret <= 0) {
- return false;
- }
-
- ret = readUTF(this->_uuidConnection,&outStream);
- if (ret <= 0) {
- return false;
- }
-
- // read flow attributes
- uint32_t numAttributes = 0;
- ret = read(numAttributes,&outStream);
- if (ret != 4) {
- return false;
- }
-
- for (uint32_t i = 0; i < numAttributes; i++) {
- std::string key;
- ret = readUTF(key,&outStream, true);
- if (ret <= 0) {
- return false;
- }
- std::string value;
- ret = readUTF(value,&outStream, true);
- if (ret <= 0) {
- return false;
- }
- this->_attributes[key] = value;
- }
-
- ret = readUTF(this->_contentFullPath,&outStream);
- if (ret <= 0) {
- return false;
- }
-
- ret = read(this->_size,&outStream);
- if (ret != 8) {
- return false;
- }
-
- ret = read(this->_offset,&outStream);
- if (ret != 8) {
- return false;
- }
-
- return true;
-}
-
-void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap)
-{
-#ifdef LEVELDB_SUPPORT
- if (!_enable)
- return;
-
- std::vector<std::string> purgeList;
- leveldb::Iterator* it = _db->NewIterator(
- leveldb::ReadOptions());
-
- for (it->SeekToFirst(); it->Valid(); it->Next())
- {
- FlowFileEventRecord eventRead;
- std::string key = it->key().ToString();
- if (eventRead.DeSerialize((uint8_t *) it->value().data(),
- (int) it->value().size()))
- {
- auto search = connectionMap->find(eventRead.getConnectionUuid());
- if (search != connectionMap->end())
- {
- // we find the connection for the persistent flowfile, create the flowfile and enqueue that
- FlowFileRecord *record = new FlowFileRecord(&eventRead);
- // set store to repo to true so that we do need to persistent again in enqueue
- record->setStoredToRepository(true);
- search->second->put(record);
- }
- else
- {
- if (eventRead.getContentFullPath().length() > 0)
- {
- std::remove(eventRead.getContentFullPath().c_str());
- }
- purgeList.push_back(key);
- }
- }
- else
- {
- purgeList.push_back(key);
- }
- }
-
- delete it;
- std::vector<std::string>::iterator itPurge;
- for (itPurge = purgeList.begin(); itPurge != purgeList.end();
- itPurge++)
- {
- std::string eventId = *itPurge;
- logger_->log_info("Repository Repo %s Purge %s",
- RepositoryTypeStr[_type],
- eventId.c_str());
- Delete(eventId);
- }
-#endif
-
- return;
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GenerateFlowFile.cpp b/libminifi/src/GenerateFlowFile.cpp
deleted file mode 100644
index 12d7f70..0000000
--- a/libminifi/src/GenerateFlowFile.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * @file GenerateFlowFile.cpp
- * GenerateFlowFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include "utils/StringUtils.h"
-
-#include "GenerateFlowFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
-const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
-Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
-Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
- "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
-Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
-
-void GenerateFlowFile::initialize()
-{
- //! Set the supported properties
- std::set<Property> properties;
- properties.insert(FileSize);
- properties.insert(BatchSize);
- properties.insert(DataFormat);
- properties.insert(UniqueFlowFiles);
- setSupportedProperties(properties);
- //! Set the supported relationships
- std::set<Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
- int64_t batchSize = 1;
- bool uniqueFlowFile = true;
- int64_t fileSize = 1024;
-
- std::string value;
- if (context->getProperty(FileSize.getName(), value))
- {
- Property::StringToInt(value, fileSize);
- }
- if (context->getProperty(BatchSize.getName(), value))
- {
- Property::StringToInt(value, batchSize);
- }
- if (context->getProperty(UniqueFlowFiles.getName(), value))
- {
- StringUtils::StringToBool(value, uniqueFlowFile);
- }
-
- if (!uniqueFlowFile)
- {
- char *data;
- data = new char[fileSize];
- if (!data)
- return;
- uint64_t dataSize = fileSize;
- GenerateFlowFile::WriteCallback callback(data, dataSize);
- char *current = data;
- for (int i = 0; i < fileSize; i+= sizeof(int))
- {
- int randValue = random();
- *((int *) current) = randValue;
- current += sizeof(int);
- }
- for (int i = 0; i < batchSize; i++)
- {
- // For each batch
- FlowFileRecord *flowFile = session->create();
- if (!flowFile)
- return;
- if (fileSize > 0)
- session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
- }
- delete[] data;
- }
- else
- {
- if (!_data)
- {
- // We have not create the unique data yet
- _data = new char[fileSize];
- _dataSize = fileSize;
- char *current = _data;
- for (int i = 0; i < fileSize; i+= sizeof(int))
- {
- int randValue = random();
- *((int *) current) = randValue;
- // *((int *) current) = (0xFFFFFFFF & i);
- current += sizeof(int);
- }
- }
- GenerateFlowFile::WriteCallback callback(_data, _dataSize);
- for (int i = 0; i < batchSize; i++)
- {
- // For each batch
- FlowFileRecord *flowFile = session->create();
- if (!flowFile)
- return;
- if (fileSize > 0)
- session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
deleted file mode 100644
index 40dd387..0000000
--- a/libminifi/src/GetFile.cpp
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * @file GetFile.cpp
- * GetFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-#if (__GNUC__ >= 4)
- #if (__GNUC_MINOR__ < 9)
- #include <regex.h>
- #endif
-#endif
-#include "utils/StringUtils.h"
-#include <regex>
-#include "utils/TimeUtil.h"
-#include "GetFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string GetFile::ProcessorName("GetFile");
-Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10");
-Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".");
-Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true");
-Property GetFile::KeepSourceFile("Keep Source File",
- "If true, the file is not deleted after it has been copied to the Content Repository", "false");
-Property GetFile::MaxAge("Maximum File Age",
- "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec");
-Property GetFile::MinAge("Minimum File Age",
- "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec");
-Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B");
-Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B");
-Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec");
-Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true");
-Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*");
-Relationship GetFile::Success("success", "All files are routed to success");
-
-void GetFile::initialize()
-{
- //! Set the supported properties
- std::set<Property> properties;
- properties.insert(BatchSize);
- properties.insert(Directory);
- properties.insert(IgnoreHiddenFile);
- properties.insert(KeepSourceFile);
- properties.insert(MaxAge);
- properties.insert(MinAge);
- properties.insert(MaxSize);
- properties.insert(MinSize);
- properties.insert(PollInterval);
- properties.insert(Recurse);
- properties.insert(FileFilter);
- setSupportedProperties(properties);
- //! Set the supported relationships
- std::set<Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
- std::string value;
-
- logger_->log_info("onTrigger GetFile");
- if (context->getProperty(Directory.getName(), value))
- {
- _directory = value;
- }
- if (context->getProperty(BatchSize.getName(), value))
- {
- Property::StringToInt(value, _batchSize);
- }
- if (context->getProperty(IgnoreHiddenFile.getName(), value))
- {
- StringUtils::StringToBool(value, _ignoreHiddenFile);
- }
- if (context->getProperty(KeepSourceFile.getName(), value))
- {
- StringUtils::StringToBool(value, _keepSourceFile);
- }
-
- logger_->log_info("onTrigger GetFile");
- if (context->getProperty(MaxAge.getName(), value))
- {
- TimeUnit unit;
- if (Property::StringToTime(value, _maxAge, unit) &&
- Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
- {
-
- }
- }
- if (context->getProperty(MinAge.getName(), value))
- {
- TimeUnit unit;
- if (Property::StringToTime(value, _minAge, unit) &&
- Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
- {
-
- }
- }
- if (context->getProperty(MaxSize.getName(), value))
- {
- Property::StringToInt(value, _maxSize);
- }
- if (context->getProperty(MinSize.getName(), value))
- {
- Property::StringToInt(value, _minSize);
- }
- if (context->getProperty(PollInterval.getName(), value))
- {
- TimeUnit unit;
- if (Property::StringToTime(value, _pollInterval, unit) &&
- Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval))
- {
-
- }
- }
- if (context->getProperty(Recurse.getName(), value))
- {
- StringUtils::StringToBool(value, _recursive);
- }
-
- if (context->getProperty(FileFilter.getName(), value))
- {
- _fileFilter = value;
- }
-
- // Perform directory list
- logger_->log_info("Is listing empty %i",isListingEmpty());
- if (isListingEmpty())
- {
- if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval)
- {
- performListing(_directory);
- }
- }
- logger_->log_info("Is listing empty %i",isListingEmpty());
-
- if (!isListingEmpty())
- {
- try
- {
- std::queue<std::string> list;
- pollListing(list, _batchSize);
- while (!list.empty())
- {
-
- std::string fileName = list.front();
- list.pop();
- logger_->log_info("GetFile process %s", fileName.c_str());
- FlowFileRecord *flowFile = session->create();
- if (!flowFile)
- return;
- std::size_t found = fileName.find_last_of("/\\");
- std::string path = fileName.substr(0,found);
- std::string name = fileName.substr(found+1);
- flowFile->updateAttribute(FILENAME, name);
- flowFile->updateAttribute(PATH, path);
- flowFile->addAttribute(ABSOLUTE_PATH, fileName);
- session->import(fileName, flowFile, _keepSourceFile);
- session->transfer(flowFile, Success);
- }
- }
- catch (std::exception &exception)
- {
- logger_->log_debug("GetFile Caught Exception %s", exception.what());
- throw;
- }
- catch (...)
- {
- throw;
- }
- }
-
-}
-
-bool GetFile::isListingEmpty()
-{
- std::lock_guard<std::mutex> lock(_mtx);
-
- return _dirList.empty();
-}
-
-void GetFile::putListing(std::string fileName)
-{
- std::lock_guard<std::mutex> lock(_mtx);
-
- _dirList.push(fileName);
-}
-
-void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
-{
- std::lock_guard<std::mutex> lock(_mtx);
-
- while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
- {
- std::string fileName = _dirList.front();
- _dirList.pop();
- list.push(fileName);
- }
-
- return;
-}
-
-bool GetFile::acceptFile(std::string fullName, std::string name)
-{
- struct stat statbuf;
-
- if (stat(fullName.c_str(), &statbuf) == 0)
- {
- if (_minSize > 0 && statbuf.st_size <_minSize)
- return false;
-
- if (_maxSize > 0 && statbuf.st_size > _maxSize)
- return false;
-
- uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
- uint64_t fileAge = getTimeMillis() - modifiedTime;
- if (_minAge > 0 && fileAge < _minAge)
- return false;
- if (_maxAge > 0 && fileAge > _maxAge)
- return false;
-
- if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
- return false;
-
- if (access(fullName.c_str(), R_OK) != 0)
- return false;
-
- if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
- return false;
-
-
- #ifdef __GNUC__
- #if (__GNUC__ >= 4)
- #if (__GNUC_MINOR__ < 9)
- regex_t regex;
- int ret = regcomp(®ex, _fileFilter.c_str(),0);
- if (ret)
- return false;
- ret = regexec(®ex,name.c_str(),(size_t)0,NULL,0);
- regfree(®ex);
- if (ret)
- return false;
- #else
- try{
- std::regex re(_fileFilter);
-
- if (!std::regex_match(name, re)) {
- return false;
- }
- } catch (std::regex_error e) {
- logger_->log_error("Invalid File Filter regex: %s.", e.what());
- return false;
- }
- #endif
- #endif
- #else
- logger_->log_info("Cannot support regex filtering");
- #endif
- return true;
- }
-
- return false;
-}
-
-void GetFile::performListing(std::string dir)
-{
- logger_->log_info("Performing file listing against %s",dir.c_str());
- DIR *d;
- d = opendir(dir.c_str());
- if (!d)
- return;
- // only perform a listing while we are not empty
- logger_->log_info("Performing file listing against %s",dir.c_str());
- while (isRunning())
- {
- struct dirent *entry;
- entry = readdir(d);
- if (!entry)
- break;
- std::string d_name = entry->d_name;
- if ((entry->d_type & DT_DIR))
- {
- // if this is a directory
- if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0)
- {
- std::string path = dir + "/" + d_name;
- performListing(path);
- }
- }
- else
- {
- std::string fileName = dir + "/" + d_name;
- if (acceptFile(fileName, d_name))
- {
- // check whether we can take this file
- putListing(fileName);
- }
- }
- }
- closedir(d);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenHTTP.cpp b/libminifi/src/ListenHTTP.cpp
deleted file mode 100644
index 89ce1d2..0000000
--- a/libminifi/src/ListenHTTP.cpp
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * @file ListenHTTP.cpp
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <uuid/uuid.h>
-
-#include <CivetServer.h>
-
-#include "ListenHTTP.h"
-
-#include "utils/TimeUtil.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-const std::string ListenHTTP::ProcessorName("ListenHTTP");
-
-Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
-Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
-Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.", ".*");
-Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
-Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
-Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
-Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes", "");
-
-Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize()
-{
- _logger->log_info("Initializing ListenHTTP");
-
- //! Set the supported properties
- std::set<Property> properties;
- properties.insert(BasePath);
- properties.insert(Port);
- properties.insert(AuthorizedDNPattern);
- properties.insert(SSLCertificate);
- properties.insert(SSLCertificateAuthority);
- properties.insert(SSLVerifyPeer);
- properties.insert(SSLMinimumVersion);
- properties.insert(HeadersAsAttributesRegex);
- setSupportedProperties(properties);
- //! Set the supported relationships
- std::set<Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory)
-{
-
- std::string basePath;
-
- if (!context->getProperty(BasePath.getName(), basePath))
- {
- _logger->log_info("%s attribute is missing, so default value of %s will be used",
- BasePath.getName().c_str(),
- BasePath.getValue().c_str());
- basePath = BasePath.getValue();
- }
-
- basePath.insert(0, "/");
-
- std::string listeningPort;
-
- if (!context->getProperty(Port.getName(), listeningPort))
- {
- _logger->log_error("%s attribute is missing or invalid",
- Port.getName().c_str());
- return;
- }
-
- std::string authDNPattern;
-
- if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty())
- {
- _logger->log_info("ListenHTTP using %s: %s",
- AuthorizedDNPattern.getName().c_str(),
- authDNPattern.c_str());
- }
-
- std::string sslCertFile;
-
- if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty())
- {
- _logger->log_info("ListenHTTP using %s: %s",
- SSLCertificate.getName().c_str(),
- sslCertFile.c_str());
- }
-
- // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
- std::string sslCertAuthorityFile;
- std::string sslVerifyPeer;
- std::string sslMinVer;
-
- if (!sslCertFile.empty())
- {
- if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile)
- && !sslCertAuthorityFile.empty())
- {
- _logger->log_info("ListenHTTP using %s: %s",
- SSLCertificateAuthority.getName().c_str(),
- sslCertAuthorityFile.c_str());
- }
-
- if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer))
- {
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0)
- {
- _logger->log_info("ListenHTTP will not verify peers");
- }
- else
- {
- _logger->log_info("ListenHTTP will verify peers");
- }
- }
- else
- {
- _logger->log_info("ListenHTTP will not verify peers");
- }
-
- if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer))
- {
- _logger->log_info("ListenHTTP using %s: %s",
- SSLMinimumVersion.getName().c_str(),
- sslMinVer.c_str());
- }
- }
-
- std::string headersAsAttributesPattern;
-
- if (context->getProperty(HeadersAsAttributesRegex.getName(),headersAsAttributesPattern)
- && !headersAsAttributesPattern.empty())
- {
- _logger->log_info("ListenHTTP using %s: %s",
- HeadersAsAttributesRegex.getName().c_str(),
- headersAsAttributesPattern.c_str());
- }
-
- auto numThreads = getMaxConcurrentTasks();
-
- _logger->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads",
- listeningPort.c_str(),
- basePath.c_str(),
- numThreads);
-
- // Initialize web server
- std::vector<std::string> options;
- options.push_back("enable_keep_alive");
- options.push_back("yes");
- options.push_back("keep_alive_timeout_ms");
- options.push_back("15000");
- options.push_back("num_threads");
- options.push_back(std::to_string(numThreads));
-
- if (sslCertFile.empty())
- {
- options.push_back("listening_ports");
- options.push_back(listeningPort);
- }
- else
- {
- listeningPort += "s";
- options.push_back("listening_ports");
- options.push_back(listeningPort);
-
- options.push_back("ssl_certificate");
- options.push_back(sslCertFile);
-
- if (!sslCertAuthorityFile.empty())
- {
- options.push_back("ssl_ca_file");
- options.push_back(sslCertAuthorityFile);
- }
-
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0)
- {
- options.push_back("ssl_verify_peer");
- options.push_back("no");
- }
- else
- {
- options.push_back("ssl_verify_peer");
- options.push_back("yes");
- }
-
- if (sslMinVer.compare("SSL2") == 0)
- {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(0));
- }
- else if (sslMinVer.compare("SSL3") == 0)
- {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(1));
- }
- else if (sslMinVer.compare("TLS1.0") == 0)
- {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(2));
- }
- else if (sslMinVer.compare("TLS1.1") == 0)
- {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(3));
- }
- else
- {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(4));
- }
- }
-
- _server.reset(new CivetServer(options));
- _handler.reset(new Handler(context,
- sessionFactory,
- std::move(authDNPattern),
- std::move(headersAsAttributesPattern)));
- _server->addHandler(basePath, _handler.get());
-}
-
-void ListenHTTP::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-
- FlowFileRecord *flowFile = session->get();
-
- // Do nothing if there are no incoming files
- if (!flowFile)
- {
- return;
- }
-}
-
-ListenHTTP::Handler::Handler(ProcessContext *context,
- ProcessSessionFactory *sessionFactory,
- std::string &&authDNPattern,
- std::string &&headersAsAttributesPattern)
-: _authDNRegex(std::move(authDNPattern))
-, _headersAsAttributesRegex(std::move(headersAsAttributesPattern))
-{
- _processContext = context;
- _processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn)
-{
- mg_printf(conn,
- "HTTP/1.1 500 Internal Server Error\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn)
-{
- _logger = Logger::getLogger();
-
- auto req_info = mg_get_request_info(conn);
- _logger->log_info("ListenHTTP handling POST request of length %d", req_info->content_length);
-
- // If this is a two-way TLS connection, authorize the peer against the configured pattern
- if (req_info->is_ssl && req_info->client_cert != nullptr)
- {
- if (!std::regex_match(req_info->client_cert->subject, _authDNRegex))
- {
- mg_printf(conn,
- "HTTP/1.1 403 Forbidden\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
- _logger->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
- return true;
- }
- }
-
- // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
- mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
- auto session = _processSessionFactory->createSession();
- ListenHTTP::WriteCallback callback(conn, req_info);
- auto flowFile = session->create();
-
- if (!flowFile)
- {
- sendErrorResponse(conn);
- return true;
- }
-
- try
- {
- session->write(flowFile, &callback);
-
- // Add filename from "filename" header value (and pattern headers)
- for (int i = 0; i < req_info->num_headers; i++)
- {
- auto header = &req_info->http_headers[i];
-
- if (strcmp("filename", header->name) == 0)
- {
- if (!flowFile->updateAttribute("filename", header->value))
- {
- flowFile->addAttribute("filename", header->value);
- }
- }
- else if (std::regex_match(header->name, _headersAsAttributesRegex))
- {
- if (!flowFile->updateAttribute(header->name, header->value))
- {
- flowFile->addAttribute(header->name, header->value);
- }
- }
- }
-
- session->transfer(flowFile, Success);
- session->commit();
- }
- catch (std::exception &exception)
- {
- _logger->log_debug("ListenHTTP Caught Exception %s", exception.what());
- sendErrorResponse(conn);
- session->rollback();
- throw;
- }
- catch (...)
- {
- _logger->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
- sendErrorResponse(conn);
- session->rollback();
- throw;
- }
-
- mg_printf(conn,
- "HTTP/1.1 200 OK\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-
- return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
-{
- _logger = Logger::getLogger();
- _conn = conn;
- _reqInfo = reqInfo;
-}
-
-void ListenHTTP::WriteCallback::process(std::ofstream *stream)
-{
- long long rlen;
- long long nlen = 0;
- long long tlen = _reqInfo->content_length;
- char buf[16384];
-
- while (nlen < tlen)
- {
- rlen = tlen - nlen;
-
- if (rlen > sizeof(buf))
- {
- rlen = sizeof(buf);
- }
-
- // Read a buffer of data from client
- rlen = mg_read(_conn, &buf[0], (size_t)rlen);
-
- if (rlen <= 0)
- {
- break;
- }
-
- // Transfer buffer data to the output stream
- stream->write(&buf[0], rlen);
-
- nlen += rlen;
- }
-}