You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:14 UTC

[09/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index eca84be..e472a9a 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -30,765 +30,271 @@
 #include <unistd.h>
 #include <future>
 #include "FlowController.h"
-#include "ProcessContext.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
+#include "core/core.h"
+#include "core/repository/FlowFileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+#define DEFAULT_CONFIG_NAME "conf/flow.yml"
+
+FlowController::FlowController(
+    std::shared_ptr<core::Repository> provenance_repo,
+    std::shared_ptr<core::Repository> flow_file_repo,
+    std::unique_ptr<core::FlowConfiguration> flow_configuration,
+    const std::string name, bool headless_mode)
+    : CoreComponent(core::getClassName<FlowController>()),
+      root_(nullptr),
+      max_timer_driven_threads_(0),
+      max_event_driven_threads_(0),
+      running_(false),
+      initialized_(false),
+      provenance_repo_(provenance_repo),
+      flow_file_repo_(flow_file_repo),
+      protocol_(0),
+      _timerScheduler(provenance_repo_),
+      _eventScheduler(provenance_repo_),
+      flow_configuration_(std::move(flow_configuration)) {
+  if (provenance_repo == nullptr)
+    throw std::runtime_error("Provenance Repo should not be null");
+  if (flow_file_repo == nullptr)
+    throw std::runtime_error("Flow Repo should not be null");
+
+  uuid_generate(uuid_);
+  setUUID(uuid_);
+
+  // Setup the default values
+  if (flow_configuration_ != nullptr) {
+    configuration_filename_ = flow_configuration_->getConfigurationPath();
+  }
+  max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+  max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+  running_ = false;
+  initialized_ = false;
+  root_ = NULL;
+
+  protocol_ = new FlowControlProtocol(this);
+
+  // NiFi config properties
+  configure_ = Configure::getConfigure();
+
+  if (!headless_mode) {
+    std::string rawConfigFileString;
+    configure_->get(Configure::nifi_flow_configuration_file,
+                    rawConfigFileString);
+
+    if (!rawConfigFileString.empty()) {
+      configuration_filename_ = rawConfigFileString;
+    }
+
+    std::string adjustedFilename;
+    if (!configuration_filename_.empty()) {
+      // perform a naive determination if this is a relative path
+      if (configuration_filename_.c_str()[0] != '/') {
+        adjustedFilename = adjustedFilename + configure_->getHome() + "/"
+            + configuration_filename_;
+      } else {
+        adjustedFilename = configuration_filename_;
+      }
+    }
+
+    initializePaths(adjustedFilename);
+  }
 
-FlowController *FlowControllerFactory::_flowController(NULL);
-
-FlowControllerImpl::FlowControllerImpl(std::string name)  {
-	uuid_generate(_uuid);
-
-	_name = name;
-	// Setup the default values
-	_configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
-	_maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-	_maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
-	_running = false;
-	_initialized = false;
-	_root = NULL;
-	logger_ = Logger::getLogger();
-	_protocol = new FlowControlProtocol(this);
-
-	// NiFi config properties
-	configure_ = Configure::getConfigure();
-
-	std::string rawConfigFileString;
-	configure_->get(Configure::nifi_flow_configuration_file,
-			rawConfigFileString);
-
-	if (!rawConfigFileString.empty()) {
-		_configurationFileName = rawConfigFileString;
-	}
-
-	char *path = NULL;
-	char full_path[PATH_MAX];
-
-	std::string adjustedFilename;
-	if (!_configurationFileName.empty()) {
-		// perform a naive determination if this is a relative path
-		if (_configurationFileName.c_str()[0] != '/') {
-			adjustedFilename = adjustedFilename + configure_->getHome() + "/"
-					+ _configurationFileName;
-		} else {
-			adjustedFilename = _configurationFileName;
-		}
-	}
-
-	path = realpath(adjustedFilename.c_str(), full_path);
-
-	std::string pathString(path);
-	_configurationFileName = pathString;
-	logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
-
-	// Create the content repo directory if needed
-	struct stat contentDirStat;
-
-	if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode))
-	{
-		path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
-		logger_->log_info("FlowController content directory %s", full_path);
-	}
-	else
-	{
-	   if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1)
-	   {
-		   logger_->log_error("FlowController content directory creation failed");
-		   exit(1);
-	   }
-	}
-
-	
-	std::string clientAuthStr;
-
-	if (!path) {
-		logger_->log_error(
-				"Could not locate path from provided configuration file name (%s).  Exiting.",
-				full_path);
-		exit(1);
-	}
-
-
-	// Create repos for flow record and provenance
-	_flowfileRepo = new FlowFileRepository();
-	_flowfileRepo->initialize();
-	_provenanceRepo = new ProvenanceRepository();
-	_provenanceRepo->initialize();
 }
 
-FlowControllerImpl::~FlowControllerImpl() {
+void FlowController::initializePaths(const std::string &adjustedFilename) {
+  char *path = NULL;
+  char full_path[PATH_MAX];
+  path = realpath(adjustedFilename.c_str(), full_path);
+
+  if (path == NULL) {
+    throw std::runtime_error(
+        "Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
+  }
+  std::string pathString(path);
+  configuration_filename_ = pathString;
+  logger_->log_info("FlowController NiFi Configuration file %s",
+                    pathString.c_str());
+
+  // Create the content repo directory if needed
+  struct stat contentDirStat;
+
+  if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat)
+      != -1&& S_ISDIR(contentDirStat.st_mode)) {
+    path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
+    logger_->log_info("FlowController content directory %s", full_path);
+  } else {
+    if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) {
+      logger_->log_error("FlowController content directory creation failed");
+      exit(1);
+    }
+  }
 
-	stop(true);
-	unload();
-	if (NULL != _protocol)
-		delete _protocol;
-	if (NULL != _provenanceRepo)
-		delete _provenanceRepo;
-	if (NULL != _flowfileRepo)
-		delete _flowfileRepo;
+  std::string clientAuthStr;
 
-}
+  if (!path) {
+    logger_->log_error(
+        "Could not locate path from provided configuration file name (%s).  Exiting.",
+        full_path);
+    exit(1);
+  }
 
-void FlowControllerImpl::stop(bool force) {
+}
 
-	if (_running) {
-		// immediately indicate that we are not running
-		_running = false;
+FlowController::~FlowController() {
+  stop(true);
+  unload();
+  if (NULL != protocol_)
+    delete protocol_;
 
-		logger_->log_info("Stop Flow Controller");
-		this->_timerScheduler.stop();
-		this->_eventScheduler.stop();
-		this->_flowfileRepo->stop();
-		this->_provenanceRepo->stop();
-		// Wait for sometime for thread stop
-		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-		if (this->_root)
-			this->_root->stopProcessing(&this->_timerScheduler,
-					&this->_eventScheduler);
+}
 
-	}
+void FlowController::stop(bool force) {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    // immediately indicate that we are not running
+    running_ = false;
+
+    logger_->log_info("Stop Flow Controller");
+    this->_timerScheduler.stop();
+    this->_eventScheduler.stop();
+    this->flow_file_repo_->stop();
+    this->provenance_repo_->stop();
+    // Wait for sometime for thread stop
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    if (this->root_)
+      this->root_->stopProcessing(&this->_timerScheduler,
+                                  &this->_eventScheduler);
+
+  }
 }
 
 /**
  * This function will attempt to unload yaml and stop running Processors.
  *
  * If the latter attempt fails or does not complete within the prescribed
- * period, _running will be set to false and we will return.
+ * period, running_ will be set to false and we will return.
  *
  * @param timeToWaitMs Maximum time to wait before manually
  * marking running as false.
  */
-void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) {
-	if (_running) {
-		// use the current time and increment with the provided argument.
-		std::chrono::system_clock::time_point wait_time =
-				std::chrono::system_clock::now()
-						+ std::chrono::milliseconds(timeToWaitMs);
-
-		// create an asynchronous future.
-		std::future<void> unload_task = std::async(std::launch::async,
-				[this]() {unload();});
-
-		if (std::future_status::ready == unload_task.wait_until(wait_time)) {
-			_running = false;
-		}
-
-	}
-}
-
-
-void FlowControllerImpl::unload() {
-	if (_running) {
-		stop(true);
-	}
-	if (_initialized) {
-		logger_->log_info("Unload Flow Controller");
-		if (_root)
-			delete _root;
-		_root = NULL;
-		_initialized = false;
-		_name = "";
-	}
-
-	return;
-}
-
-Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) {
-	Processor *processor = NULL;
-	if (name == GenerateFlowFile::ProcessorName) {
-		processor = new GenerateFlowFile(name, uuid);
-	} else if (name == LogAttribute::ProcessorName) {
-		processor = new LogAttribute(name, uuid);
-	} else if (name == RealTimeDataCollector::ProcessorName) {
-		processor = new RealTimeDataCollector(name, uuid);
-	} else if (name == GetFile::ProcessorName) {
-		processor = new GetFile(name, uuid);
-	} else if (name == PutFile::ProcessorName) {
-		processor = new PutFile(name, uuid);
-	} else if (name == TailFile::ProcessorName) {
-		processor = new TailFile(name, uuid);
-	} else if (name == ListenSyslog::ProcessorName) {
-		processor = new ListenSyslog(name, uuid);
-	} else if (name == ListenHTTP::ProcessorName) {
-        processor = new ListenHTTP(name, uuid);
-	} else if (name == ExecuteProcess::ProcessorName) {
-		processor = new ExecuteProcess(name, uuid);
-	} else if (name == AppendHostInfo::ProcessorName) {
-		processor = new AppendHostInfo(name, uuid);
-	} else {
-		logger_->log_error("No Processor defined for %s", name.c_str());
-		return NULL;
-	}
-
-	//! initialize the processor
-	processor->initialize();
-
-	return processor;
-}
-
-ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name,
-		uuid_t uuid) {
-	return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
-}
-
-ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name,
-		uuid_t uuid) {
-	return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
-}
-
-Connection *FlowControllerImpl::createConnection(std::string name,
-		uuid_t uuid) {
-	return new Connection(name, uuid);
-}
-
-#ifdef YAML_SUPPORT
-void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-	uuid_t uuid;
-	ProcessGroup *group = NULL;
-
-	std::string flowName = rootFlowNode["name"].as<std::string>();
-	std::string id = rootFlowNode["id"].as<std::string>();
-
-	uuid_parse(id.c_str(), uuid);
-
-	logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
-	logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
-	group = this->createRootProcessGroup(flowName, uuid);
-	this->_root = group;
-	this->_name = flowName;
-}
-
-void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
-		ProcessGroup *parentGroup) {
-	int64_t schedulingPeriod = -1;
-	int64_t penalizationPeriod = -1;
-	int64_t yieldPeriod = -1;
-	int64_t runDurationNanos = -1;
-	uuid_t uuid;
-	Processor *processor = NULL;
-
-	if (!parentGroup) {
-		logger_->log_error("parseProcessNodeYaml: no parent group exists");
-		return;
-	}
-
-	if (processorsNode) {
-
-		if (processorsNode.IsSequence()) {
-			// Evaluate sequence of processors
-			int numProcessors = processorsNode.size();
-
-			for (YAML::const_iterator iter = processorsNode.begin();
-					iter != processorsNode.end(); ++iter) {
-				ProcessorConfig procCfg;
-				YAML::Node procNode = iter->as<YAML::Node>();
-
-				procCfg.name = procNode["name"].as<std::string>();
-				procCfg.id = procNode["id"].as<std::string>();
-				logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
-						procCfg.name.c_str(), procCfg.id.c_str());
-				procCfg.javaClass = procNode["class"].as<std::string>();
-				logger_->log_debug("parseProcessorNode: class => [%s]",
-						procCfg.javaClass.c_str());
-
-				uuid_parse(procCfg.id.c_str(), uuid);
-
-				// Determine the processor name only from the Java class
-				int lastOfIdx = procCfg.javaClass.find_last_of(".");
-				if (lastOfIdx != std::string::npos) {
-					lastOfIdx++; // if a value is found, increment to move beyond the .
-					int nameLength = procCfg.javaClass.length() - lastOfIdx;
-					std::string processorName = procCfg.javaClass.substr(
-							lastOfIdx, nameLength);
-					processor = this->createProcessor(processorName, uuid);
-				}
-
-				if (!processor) {
-					logger_->log_error(
-							"Could not create a processor %s with name %s",
-							procCfg.name.c_str(), procCfg.id.c_str());
-					throw std::invalid_argument(
-							"Could not create processor " + procCfg.name);
-				}
-				processor->setName(procCfg.name);
-
-				procCfg.maxConcurrentTasks =
-						procNode["max concurrent tasks"].as<std::string>();
-				logger_->log_debug(
-						"parseProcessorNode: max concurrent tasks => [%s]",
-						procCfg.maxConcurrentTasks.c_str());
-				procCfg.schedulingStrategy = procNode["scheduling strategy"].as<
-						std::string>();
-				logger_->log_debug(
-						"parseProcessorNode: scheduling strategy => [%s]",
-						procCfg.schedulingStrategy.c_str());
-				procCfg.schedulingPeriod = procNode["scheduling period"].as<
-						std::string>();
-				logger_->log_debug(
-						"parseProcessorNode: scheduling period => [%s]",
-						procCfg.schedulingPeriod.c_str());
-				procCfg.penalizationPeriod = procNode["penalization period"].as<
-						std::string>();
-				logger_->log_debug(
-						"parseProcessorNode: penalization period => [%s]",
-						procCfg.penalizationPeriod.c_str());
-				procCfg.yieldPeriod =
-						procNode["yield period"].as<std::string>();
-				logger_->log_debug("parseProcessorNode: yield period => [%s]",
-						procCfg.yieldPeriod.c_str());
-				procCfg.yieldPeriod = procNode["run duration nanos"].as<
-						std::string>();
-				logger_->log_debug(
-						"parseProcessorNode: run duration nanos => [%s]",
-						procCfg.runDurationNanos.c_str());
-
-				// handle auto-terminated relationships
-				YAML::Node autoTerminatedSequence =
-						procNode["auto-terminated relationships list"];
-				std::vector<std::string> rawAutoTerminatedRelationshipValues;
-				if (autoTerminatedSequence.IsSequence()
-						&& !autoTerminatedSequence.IsNull()
-						&& autoTerminatedSequence.size() > 0) {
-					for (YAML::const_iterator relIter =
-							autoTerminatedSequence.begin();
-							relIter != autoTerminatedSequence.end();
-							++relIter) {
-						std::string autoTerminatedRel =
-								relIter->as<std::string>();
-						rawAutoTerminatedRelationshipValues.push_back(
-								autoTerminatedRel);
-					}
-				}
-				procCfg.autoTerminatedRelationships =
-						rawAutoTerminatedRelationshipValues;
-
-				// handle processor properties
-				YAML::Node propertiesNode = procNode["Properties"];
-				parsePropertiesNodeYaml(&propertiesNode, processor);
-
-				// Take care of scheduling
-				TimeUnit unit;
-				if (Property::StringToTime(procCfg.schedulingPeriod,
-						schedulingPeriod, unit)
-						&& Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
-								schedulingPeriod)) {
-					logger_->log_debug(
-							"convert: parseProcessorNode: schedulingPeriod => [%d] ns",
-							schedulingPeriod);
-					processor->setSchedulingPeriodNano(schedulingPeriod);
-				}
-
-				if (Property::StringToTime(procCfg.penalizationPeriod,
-						penalizationPeriod, unit)
-						&& Property::ConvertTimeUnitToMS(penalizationPeriod,
-								unit, penalizationPeriod)) {
-					logger_->log_debug(
-							"convert: parseProcessorNode: penalizationPeriod => [%d] ms",
-							penalizationPeriod);
-					processor->setPenalizationPeriodMsec(penalizationPeriod);
-				}
-
-				if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod,
-						unit)
-						&& Property::ConvertTimeUnitToMS(yieldPeriod, unit,
-								yieldPeriod)) {
-					logger_->log_debug(
-							"convert: parseProcessorNode: yieldPeriod => [%d] ms",
-							yieldPeriod);
-					processor->setYieldPeriodMsec(yieldPeriod);
-				}
-
-				// Default to running
-				processor->setScheduledState(RUNNING);
-
-				if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
-					processor->setSchedulingStrategy(TIMER_DRIVEN);
-					logger_->log_debug("setting scheduling strategy as %s",
-							procCfg.schedulingStrategy.c_str());
-				} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-					processor->setSchedulingStrategy(EVENT_DRIVEN);
-					logger_->log_debug("setting scheduling strategy as %s",
-							procCfg.schedulingStrategy.c_str());
-				} else {
-					processor->setSchedulingStrategy(CRON_DRIVEN);
-					logger_->log_debug("setting scheduling strategy as %s",
-							procCfg.schedulingStrategy.c_str());
-
-				}
-
-				int64_t maxConcurrentTasks;
-				if (Property::StringToInt(procCfg.maxConcurrentTasks,
-						maxConcurrentTasks)) {
-					logger_->log_debug(
-							"parseProcessorNode: maxConcurrentTasks => [%d]",
-							maxConcurrentTasks);
-					processor->setMaxConcurrentTasks(maxConcurrentTasks);
-				}
-
-				if (Property::StringToInt(procCfg.runDurationNanos,
-						runDurationNanos)) {
-					logger_->log_debug(
-							"parseProcessorNode: runDurationNanos => [%d]",
-							runDurationNanos);
-					processor->setRunDurationNano(runDurationNanos);
-				}
-
-				std::set<Relationship> autoTerminatedRelationships;
-				for (auto &&relString : procCfg.autoTerminatedRelationships) {
-					Relationship relationship(relString, "");
-					logger_->log_debug(
-							"parseProcessorNode: autoTerminatedRelationship  => [%s]",
-							relString.c_str());
-					autoTerminatedRelationships.insert(relationship);
-				}
-
-				processor->setAutoTerminatedRelationships(
-						autoTerminatedRelationships);
-
-				parentGroup->addProcessor(processor);
-			}
-		}
-	} else {
-		throw new std::invalid_argument(
-				"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
-	}
-}
-
-void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
-		ProcessGroup *parentGroup) {
-	uuid_t uuid;
-
-	if (!parentGroup) {
-		logger_->log_error(
-				"parseRemoteProcessGroupYaml: no parent group exists");
-		return;
-	}
-
-	if (rpgNode) {
-		if (rpgNode->IsSequence()) {
-			for (YAML::const_iterator iter = rpgNode->begin();
-					iter != rpgNode->end(); ++iter) {
-				YAML::Node rpgNode = iter->as<YAML::Node>();
-
-				auto name = rpgNode["name"].as<std::string>();
-				auto id = rpgNode["id"].as<std::string>();
-
-				logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
-						name.c_str(), id.c_str());
-
-				std::string url = rpgNode["url"].as<std::string>();
-				logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
-						url.c_str());
-
-				std::string timeout = rpgNode["timeout"].as<std::string>();
-				logger_->log_debug(
-						"parseRemoteProcessGroupYaml: timeout => [%s]",
-						timeout.c_str());
-
-				std::string yieldPeriod =
-						rpgNode["yield period"].as<std::string>();
-				logger_->log_debug(
-						"parseRemoteProcessGroupYaml: yield period => [%s]",
-						yieldPeriod.c_str());
-
-				YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
-				YAML::Node outputPorts =
-						rpgNode["Output Ports"].as<YAML::Node>();
-				ProcessGroup *group = NULL;
-
-				uuid_parse(id.c_str(), uuid);
-
-				int64_t timeoutValue = -1;
-				int64_t yieldPeriodValue = -1;
-
-				group = this->createRemoteProcessGroup(name.c_str(), uuid);
-				group->setParent(parentGroup);
-				parentGroup->addProcessGroup(group);
-
-				TimeUnit unit;
-
-				if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-						&& Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
-								yieldPeriodValue) && group) {
-					logger_->log_debug(
-							"parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
-							yieldPeriodValue);
-					group->setYieldPeriodMsec(yieldPeriodValue);
-				}
-
-				if (Property::StringToTime(timeout, timeoutValue, unit)
-						&& Property::ConvertTimeUnitToMS(timeoutValue, unit,
-								timeoutValue) && group) {
-					logger_->log_debug(
-							"parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
-							timeoutValue);
-					group->setTimeOut(timeoutValue);
-				}
-
-				group->setTransmitting(true);
-				group->setURL(url);
-
-				if (inputPorts && inputPorts.IsSequence()) {
-					for (YAML::const_iterator portIter = inputPorts.begin();
-							portIter != inputPorts.end(); ++portIter) {
-						logger_->log_debug("Got a current port, iterating...");
-
-						YAML::Node currPort = portIter->as<YAML::Node>();
-
-						this->parsePortYaml(&currPort, group, SEND);
-					} // for node
-				}
-				if (outputPorts && outputPorts.IsSequence()) {
-					for (YAML::const_iterator portIter = outputPorts.begin();
-							portIter != outputPorts.end(); ++portIter) {
-						logger_->log_debug("Got a current port, iterating...");
-
-						YAML::Node currPort = portIter->as<YAML::Node>();
-
-						this->parsePortYaml(&currPort, group, RECEIVE);
-					} // for node
-				}
-
-			}
-		}
-	}
-}
-
-void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
-		ProcessGroup *parent) {
-	uuid_t uuid;
-	Connection *connection = NULL;
-
-	if (!parent) {
-		logger_->log_error("parseProcessNode: no parent group was provided");
-		return;
-	}
-
-	if (connectionsNode) {
-
-		if (connectionsNode->IsSequence()) {
-			for (YAML::const_iterator iter = connectionsNode->begin();
-					iter != connectionsNode->end(); ++iter) {
-
-				YAML::Node connectionNode = iter->as<YAML::Node>();
-
-				std::string name = connectionNode["name"].as<std::string>();
-				std::string id = connectionNode["id"].as<std::string>();
-				std::string destId = connectionNode["destination id"].as<
-						std::string>();
-
-				uuid_parse(id.c_str(), uuid);
-
-				logger_->log_debug(
-						"Created connection with UUID %s and name %s", id.c_str(),
-						name.c_str());
-				connection = this->createConnection(name, uuid);
-				auto rawRelationship =
-						connectionNode["source relationship name"].as<
-								std::string>();
-				Relationship relationship(rawRelationship, "");
-				logger_->log_debug("parseConnection: relationship => [%s]",
-						rawRelationship.c_str());
-				if (connection)
-					connection->setRelationship(relationship);
-				std::string connectionSrcProcId =
-						connectionNode["source id"].as<std::string>();
-				uuid_t srcUUID;
-				uuid_parse(connectionSrcProcId.c_str(), srcUUID);
-
-				Processor *srcProcessor = this->_root->findProcessor(
-						srcUUID);
-
-				if (!srcProcessor) {
-					logger_->log_error(
-							"Could not locate a source with id %s to create a connection",
-							connectionSrcProcId.c_str());
-					throw std::invalid_argument(
-							"Could not locate a source with id %s to create a connection "
-									+ connectionSrcProcId);
-				}
-
-				uuid_t destUUID;
-				uuid_parse(destId.c_str(), destUUID);
-				Processor *destProcessor = this->_root->findProcessor(destUUID);
-				// If we could not find name, try by UUID
-				if (!destProcessor) {
-					uuid_t destUuid;
-					uuid_parse(destId.c_str(), destUuid);
-					destProcessor = this->_root->findProcessor(destUuid);
-				}
-				if (destProcessor) {
-					std::string destUuid = destProcessor->getUUIDStr();
-				}
-
-				uuid_t srcUuid;
-				uuid_t destUuid;
-				srcProcessor->getUUID(srcUuid);
-				connection->setSourceProcessorUUID(srcUuid);
-				destProcessor->getUUID(destUuid);
-				connection->setDestinationProcessorUUID(destUuid);
-
-				if (connection) {
-					parent->addConnection(connection);
-				}
-			}
-		}
-
-		if (connection)
-			parent->addConnection(connection);
-
-		return;
-	}
-}
-
-void FlowControllerImpl::parsePortYaml(YAML::Node *portNode,
-		ProcessGroup *parent, TransferDirection direction) {
-	uuid_t uuid;
-	Processor *processor = NULL;
-	RemoteProcessorGroupPort *port = NULL;
-
-	if (!parent) {
-		logger_->log_error("parseProcessNode: no parent group existed");
-		return;
-	}
-
-	YAML::Node inputPortsObj = portNode->as<YAML::Node>();
-
-	// generate the random UIID
-	uuid_generate(uuid);
-
-	auto portId = inputPortsObj["id"].as<std::string>();
-	auto nameStr = inputPortsObj["name"].as<std::string>();
-	uuid_parse(portId.c_str(), uuid);
-
-	port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
-
-	processor = (Processor *) port;
-	port->setDirection(direction);
-	port->setTimeOut(parent->getTimeOut());
-	port->setTransmitting(true);
-	processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-	processor->initialize();
-
-	// handle port properties
-	YAML::Node nodeVal = portNode->as<YAML::Node>();
-	YAML::Node propertiesNode = nodeVal["Properties"];
-
-	parsePropertiesNodeYaml(&propertiesNode, processor);
-
-	// add processor to parent
-	parent->addProcessor(processor);
-	processor->setScheduledState(RUNNING);
-	auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<
-			std::string>();
-	int64_t maxConcurrentTasks;
-	if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-		processor->setMaxConcurrentTasks(maxConcurrentTasks);
-	}
-	logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
-			maxConcurrentTasks);
-	processor->setMaxConcurrentTasks(maxConcurrentTasks);
+void FlowController::waitUnload(const uint64_t timeToWaitMs) {
+  if (running_) {
+    // use the current time and increment with the provided argument.
+    std::chrono::system_clock::time_point wait_time =
+        std::chrono::system_clock::now()
+            + std::chrono::milliseconds(timeToWaitMs);
+
+    // create an asynchronous future.
+    std::future<void> unload_task = std::async(std::launch::async,
+                                               [this]() {unload();});
+
+    if (std::future_status::ready == unload_task.wait_until(wait_time)) {
+      running_ = false;
+    }
 
+  }
 }
 
-void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
-		Processor *processor) {
-	// Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
-	for (YAML::const_iterator propsIter = propertiesNode->begin();
-			propsIter != propertiesNode->end(); ++propsIter) {
-		std::string propertyName = propsIter->first.as<std::string>();
-		YAML::Node propertyValueNode = propsIter->second;
-		if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
-			std::string rawValueString = propertyValueNode.as<std::string>();
-			if (!processor->setProperty(propertyName, rawValueString)) {
-				logger_->log_warn(
-						"Received property %s with value %s but is not one of the properties for %s",
-						propertyName.c_str(), rawValueString.c_str(),
-						processor->getName().c_str());
-			}
-		}
-	}
+void FlowController::unload() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    stop(true);
+  }
+  if (initialized_) {
+    logger_->log_info("Unload Flow Controller");
+    root_ = nullptr;
+    initialized_ = false;
+    name_ = "";
+  }
+
+  return;
 }
-#endif /* ifdef YAML_SUPPORT */
 
-void FlowControllerImpl::load() {
-    if (_running) {
-        stop(true);
-    }
-    if (!_initialized) {
-        logger_->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
-
-#ifdef YAML_SUPPORT
-		YAML::Node flow = YAML::LoadFile(_configurationFileName);
-
-		YAML::Node flowControllerNode = flow["Flow Controller"];
-		YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
-		YAML::Node connectionsNode = flow["Connections"];
-		YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
+void FlowController::load() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    stop(true);
+  }
+  if (!initialized_) {
+    logger_->log_info("Load Flow Controller from file %s",
+                      configuration_filename_.c_str());
 
-		// Create the root process group
-		parseRootProcessGroupYaml(flowControllerNode);
-		parseProcessorNodeYaml(processorsNode, this->_root);
-		parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
-		parseConnectionYaml(&connectionsNode, this->_root);
+    this->root_ = flow_configuration_->getRoot(configuration_filename_);
 
-		// Load Flow File from Repo
-		loadFlowRepo();
-#endif
+    // Load Flow File from Repo
+    loadFlowRepo();
 
-		_initialized = true;
-    }
+    initialized_ = true;
+  }
 }
 
-void FlowControllerImpl::loadFlowRepo()
-{
-	if (this->_flowfileRepo && this->_flowfileRepo->isEnable())
-	{
-		std::map<std::string, Connection *> connectionMap;
-		this->_root->getConnections(&connectionMap);
-		this->_flowfileRepo->loadFlowFileToConnections(&connectionMap);
-	}
-}
-
-void FlowControllerImpl::reload(std::string yamlFile)
-{
-    logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
+void FlowController::reload(std::string yamlFile) {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  logger_->log_info("Starting to reload Flow Controller with yaml %s",
+                    yamlFile.c_str());
+  stop(true);
+  unload();
+  std::string oldYamlFile = this->configuration_filename_;
+  this->configuration_filename_ = yamlFile;
+  load();
+  start();
+  if (this->root_ != nullptr) {
+    this->configuration_filename_ = oldYamlFile;
+    logger_->log_info("Rollback Flow Controller to YAML %s",
+                      oldYamlFile.c_str());
     stop(true);
     unload();
-    std::string oldYamlFile = this->_configurationFileName;
-    this->_configurationFileName = yamlFile;
     load();
     start();
-       if (!this->_root)
-       {
-        this->_configurationFileName = oldYamlFile;
-        logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str());
-        stop(true);
-        unload();
-        load();
-        start();
+  }
+}
+
+void FlowController::loadFlowRepo() {
+  if (this->flow_file_repo_) {
+    std::map<std::string, std::shared_ptr<Connection>> connectionMap;
+    if (this->root_ != nullptr) {
+      this->root_->getConnections(connectionMap);
     }
+    auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
+        flow_file_repo_);
+    rep->loadFlowFileToConnections(connectionMap);
+  }
 }
 
-bool FlowControllerImpl::start() {
-	if (!_initialized) {
-		logger_->log_error(
-				"Can not start Flow Controller because it has not been initialized");
-		return false;
-	} else {
-
-		if (!_running) {
-			logger_->log_info("Starting Flow Controller");
-			this->_timerScheduler.start();
-			this->_eventScheduler.start();
-			if (this->_root)
-				this->_root->startProcessing(&this->_timerScheduler,
-						&this->_eventScheduler);
-			_running = true;
-			this->_protocol->start();
-			this->_provenanceRepo->start();
-			this->_flowfileRepo->start();
-			logger_->log_info("Started Flow Controller");
-		}
-		return true;
-	}
+bool FlowController::start() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (!initialized_) {
+    logger_->log_error(
+        "Can not start Flow Controller because it has not been initialized");
+    return false;
+  } else {
+
+    if (!running_) {
+      logger_->log_info("Starting Flow Controller");
+      this->_timerScheduler.start();
+      this->_eventScheduler.start();
+      if (this->root_ != nullptr) {
+        this->root_->startProcessing(&this->_timerScheduler,
+                                     &this->_eventScheduler);
+      }
+      running_ = true;
+      this->protocol_->start();
+      this->provenance_repo_->start();
+      this->flow_file_repo_->start();
+      logger_->log_info("Started Flow Controller");
+    }
+    return true;
+  }
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index a2f2323..7383574 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -27,253 +27,339 @@
 #include <cstdio>
 
 #include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
-
-FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim)
-: _size(0),
-  _id(_localFlowSeqNumber.load()),
-  _offset(0),
-  _penaltyExpirationMs(0),
-  _claim(claim),
-  _isStoredToRepo(false),
-  _markedDelete(false),
-  _connection(NULL),
-  _orginalConnection(NULL)
-{
-	_entryDate = getTimeMillis();
-	_lineageStartDate = _entryDate;
-
-	char uuidStr[37];
-
-	// Generate the global UUID for the flow record
-	uuid_generate(_uuid);
-	// Increase the local ID for the flow record
-	++_localFlowSeqNumber;
-	uuid_unparse_lower(_uuid, uuidStr);
-	_uuidStr = uuidStr;
-
-	// Populate the default attributes
-    addAttribute(FILENAME, std::to_string(getTimeNano()));
-    addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
-    addAttribute(UUID, uuidStr);
-	// Populate the attributes from the input
-    std::map<std::string, std::string>::iterator it;
-    for (it = attributes.begin(); it!= attributes.end(); it++)
-    {
-    	addAttribute(it->first, it->second);
-    }
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+#include "core/Repository.h"
 
-    _snapshot = false;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
-	if (_claim)
-		// Increase the flow file record owned count for the resource claim
-		_claim->increaseFlowFileRecordOwnedCount();
-	logger_ = Logger::getLogger();
-}
+std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0);
 
-FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
-: _size(0),
-  _id(_localFlowSeqNumber.load()),
-  _offset(0),
-  _penaltyExpirationMs(0),
-  _claim(NULL),
-  _isStoredToRepo(false),
-  _markedDelete(false),
-  _connection(NULL),
-  _orginalConnection(NULL)
-{
-	_entryDate = event->getFlowFileEntryDate();
-	_lineageStartDate = event->getlineageStartDate();
-	_size = event->getFileSize();
-	_offset = event->getFileOffset();
-	_lineageIdentifiers = event->getLineageIdentifiers();
-	_attributes = event->getAttributes();
-    _snapshot = false;
-    _uuidStr = event->getFlowFileUuid();
-    uuid_parse(_uuidStr.c_str(), _uuid);
-
-    if (_size > 0)
-    {
-    	_claim = new ResourceClaim();
-    }
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::map<std::string, std::string> attributes,
+    std::shared_ptr<ResourceClaim> claim)
+    : FlowFile(),
+      flow_repository_(flow_repository) {
+
+  id_ = local_flow_seq_number_.load();
+  claim_ = claim;
+  // Increase the local ID for the flow record
+  ++local_flow_seq_number_;
+  // Populate the default attributes
+  addKeyedAttribute(FILENAME, std::to_string(getTimeNano()));
+  addKeyedAttribute(PATH, DEFAULT_FLOWFILE_PATH);
+  addKeyedAttribute(UUID, getUUIDStr());
+  // Populate the attributes from the input
+  std::map<std::string, std::string>::iterator it;
+  for (it = attributes.begin(); it != attributes.end(); it++) {
+    FlowFile::addAttribute(it->first, it->second);
+  }
+
+  snapshot_ = false;
 
-	if (_claim)
-	{
-		_claim->setContentFullPath(event->getContentFullPath());
-		// Increase the flow file record owned count for the resource claim
-		_claim->increaseFlowFileRecordOwnedCount();
+  if (claim_ != nullptr)
+    // Increase the flow file record owned count for the resource claim
+    claim_->increaseFlowFileRecordOwnedCount();
+  logger_ = logging::Logger::getLogger();
+}
+
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection)
+    : FlowFile(),
+      snapshot_(""),
+      flow_repository_(flow_repository) {
+	entry_date_ = event->getEntryDate();
+	lineage_start_date_ = event->getlineageStartDate();
+	lineage_Identifiers_ = event->getlineageIdentifiers();
+	uuid_str_ = event->getUUIDStr();
+	attributes_ = event->getAttributes();
+	size_ = event->getSize();
+	offset_ = event->getOffset();
+	event->getUUID(uuid_);
+	uuid_connection_ = uuidConnection;
+	if (event->getResourceClaim()) {
+	  content_full_fath_ = event->getResourceClaim()->getContentFullPath();
 	}
-	logger_ = Logger::getLogger();
-	++_localFlowSeqNumber;
 }
 
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::shared_ptr<core::FlowFile> &event)
+    : FlowFile(),
+      uuid_connection_(""),
+      snapshot_(""),
+      flow_repository_(flow_repository) {
 
-FlowFileRecord::~FlowFileRecord()
-{
-	if (!_snapshot)
-		logger_->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
-	else
-		logger_->log_debug("Delete SnapShot FlowFile UUID %s", _uuidStr.c_str());
-	if (_claim)
-	{
-		// Decrease the flow file record owned count for the resource claim
-		_claim->decreaseFlowFileRecordOwnedCount();
-		if (_claim->getFlowFileRecordOwnedCount() <= 0)
-		{
-			logger_->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
-			std::string value;
-			if (!FlowControllerFactory::getFlowController()->getFlowFileRepository() ||
-					!FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() ||
-					!this->_isStoredToRepo ||
-					!FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr, value))
-			{
-				// if it is persistent to DB already while it is in the queue, we keep the content
-				std::remove(_claim->getContentFullPath().c_str());
-			}
-			delete _claim;
-		}
-	}
 }
 
-bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
-{
-	const char *keyStr = FlowAttributeKey(key);
-	if (keyStr)
-	{
-		std::string keyString = keyStr;
-		return addAttribute(keyString, value);
-	}
-	else
-	{
-		return false;
-	}
+FlowFileRecord::~FlowFileRecord() {
+  if (!snapshot_)
+    logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str());
+  else
+    logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str());
+  if (claim_) {
+    // Decrease the flow file record owned count for the resource claim
+    claim_->decreaseFlowFileRecordOwnedCount();
+    std::string value;
+    if (claim_->getFlowFileRecordOwnedCount() <= 0) {
+      logger_->log_debug("Delete Resource Claim %s",
+                         claim_->getContentFullPath().c_str());
+      if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
+        std::remove(claim_->getContentFullPath().c_str());
+      }
+    }
+  }
 }
 
-bool FlowFileRecord::addAttribute(std::string key, std::string value)
-{
-	std::map<std::string, std::string>::iterator it = _attributes.find(key);
-	if (it != _attributes.end())
-	{
-		// attribute already there in the map
-		return false;
-	}
-	else
-	{
-		_attributes[key] = value;
-		return true;
-	}
+bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    const std::string keyString = keyStr;
+    return FlowFile::addAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::removeAttribute(FlowAttribute key)
-{
-	const char *keyStr = FlowAttributeKey(key);
-	if (keyStr)
-	{
-		std::string keyString = keyStr;
-		return removeAttribute(keyString);
-	}
-	else
-	{
-		return false;
-	}
+bool FlowFileRecord::removeKeyedAttribute(FlowAttribute key) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::removeAttribute(keyString);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::removeAttribute(std::string key)
-{
-	std::map<std::string, std::string>::iterator it = _attributes.find(key);
-	if (it != _attributes.end())
-	{
-		_attributes.erase(key);
-		return true;
-	}
-	else
-	{
-		return false;
-	}
+bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key,
+                                          std::string value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::updateAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
-{
-	const char *keyStr = FlowAttributeKey(key);
-	if (keyStr)
-	{
-		std::string keyString = keyStr;
-		return updateAttribute(keyString, value);
-	}
-	else
-	{
-		return false;
-	}
+bool FlowFileRecord::getKeyedAttribute(FlowAttribute key, std::string &value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::getAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::updateAttribute(std::string key, std::string value)
-{
-	std::map<std::string, std::string>::iterator it = _attributes.find(key);
-	if (it != _attributes.end())
-	{
-		_attributes[key] = value;
-		return true;
-	}
-	else
-	{
-		return false;
-	}
+FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) {
+  core::FlowFile::operator=(other);
+  uuid_connection_ = other.uuid_connection_;
+  content_full_fath_ = other.content_full_fath_;
+  snapshot_ = other.snapshot_;
+  return *this;
 }
 
-bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
-{
-	const char *keyStr = FlowAttributeKey(key);
-	if (keyStr)
-	{
-		std::string keyString = keyStr;
-		return getAttribute(keyString, value);
-	}
-	else
-	{
-		return false;
-	}
+bool FlowFileRecord::DeSerialize(std::string key) {
+  std::string value;
+  bool ret;
+
+  ret = flow_repository_->Get(key, value);
+
+  if (!ret) {
+    logger_->log_error("NiFi FlowFile Store event %s can not found",
+                       key.c_str());
+    return false;
+  } else
+    logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(),
+                       value.length());
+
+  io::DataStream stream((const uint8_t*) value.data(), value.length());
+
+  ret = DeSerialize(stream);
+
+  if (ret) {
+    logger_->log_debug(
+        "NiFi FlowFile retrieve uuid %s size %d connection %s success",
+        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+  } else {
+    logger_->log_debug(
+        "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
+        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+  }
+
+  return ret;
 }
 
-bool FlowFileRecord::getAttribute(std::string key, std::string &value)
-{
-	std::map<std::string, std::string>::iterator it = _attributes.find(key);
-	if (it != _attributes.end())
-	{
-		value = it->second;
-		return true;
-	}
-	else
-	{
-		return false;
-	}
+bool FlowFileRecord::Serialize() {
+
+  io::DataStream outStream;
+
+  int ret;
+
+  ret = write(this->event_time_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = write(this->entry_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = write(this->lineage_start_date_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = writeUTF(this->uuid_str_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+
+  ret = writeUTF(this->uuid_connection_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+  // write flow attributes
+  uint32_t numAttributes = this->attributes_.size();
+  ret = write(numAttributes, &outStream);
+  if (ret != 4) {
+
+    return false;
+  }
+
+  for (auto itAttribute : attributes_) {
+    ret = writeUTF(itAttribute.first, &outStream, true);
+    if (ret <= 0) {
+
+      return false;
+    }
+    ret = writeUTF(itAttribute.second, &outStream, true);
+    if (ret <= 0) {
+
+      return false;
+    }
+  }
+
+  ret = writeUTF(this->content_full_fath_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+
+  ret = write(this->size_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = write(this->offset_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  // Persistent to the DB
+  
+
+  if (flow_repository_->Put(uuid_str_,
+                            const_cast<uint8_t*>(outStream.getBuffer()),
+                            outStream.getSize())) {
+    logger_->log_debug("NiFi FlowFile Store event %s size %d success",
+                       uuid_str_.c_str(), outStream.getSize());
+    return true;
+  } else {
+    logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+                       uuid_str_.c_str(), outStream.getSize());
+    return false;
+  }
+
+  // cleanup
+
+  return true;
 }
 
-void FlowFileRecord::duplicate(FlowFileRecord *original)
-{
-	uuid_copy(this->_uuid, original->_uuid);
-	this->_attributes = original->_attributes;
-	this->_entryDate = original->_entryDate;
-	this->_id = original->_id;
-	this->_lastQueueDate = original->_lastQueueDate;
-	this->_lineageStartDate = original->_lineageStartDate;
-	this->_offset = original->_offset;
-	this->_penaltyExpirationMs = original->_penaltyExpirationMs;
-	this->_size = original->_size;
-	this->_lineageIdentifiers = original->_lineageIdentifiers;
-	this->_orginalConnection = original->_orginalConnection;
-	this->_uuidStr = original->_uuidStr;
-	this->_connection = original->_connection;
-	this->_markedDelete = original->_markedDelete;
-
-	this->_claim = original->_claim;
-	if (this->_claim)
-		this->_claim->increaseFlowFileRecordOwnedCount();
-
-	this->_snapshot = true;
+bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+
+  int ret;
+
+  io::DataStream outStream(buffer, bufferSize);
+
+  ret = read(this->event_time_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->entry_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->lineage_start_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = readUTF(this->uuid_str_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  ret = readUTF(this->uuid_connection_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  // read flow attributes
+  uint32_t numAttributes = 0;
+  ret = read(numAttributes, &outStream);
+  if (ret != 4) {
+    return false;
+  }
+
+  for (uint32_t i = 0; i < numAttributes; i++) {
+    std::string key;
+    ret = readUTF(key, &outStream, true);
+    if (ret <= 0) {
+      return false;
+    }
+    std::string value;
+    ret = readUTF(value, &outStream, true);
+    if (ret <= 0) {
+      return false;
+    }
+    this->attributes_[key] = value;
+  }
+
+  ret = readUTF(this->content_full_fath_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  ret = read(this->size_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->offset_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  return true;
 }
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRepository.cpp b/libminifi/src/FlowFileRepository.cpp
deleted file mode 100644
index 3388738..0000000
--- a/libminifi/src/FlowFileRepository.cpp
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * @file FlowFileRepository.cpp
- * FlowFile implemenatation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-//! DeSerialize
-bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo,
-		std::string key) {
-	std::string value;
-	bool ret;
-
-	ret = repo->Get(key, value);
-
-	if (!ret) {
-		logger_->log_error("NiFi FlowFile Store event %s can not found",
-				key.c_str());
-		return false;
-	} else
-		logger_->log_debug("NiFi FlowFile Read event %s length %d",
-				key.c_str(), value.length());
-
-
-	DataStream stream((const uint8_t*)value.data(),value.length());
-
-	ret = DeSerialize(stream);
-
-	if (ret) {
-		logger_->log_debug(
-				"NiFi FlowFile retrieve uuid %s size %d connection %s success",
-				_uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
-	} else {
-		logger_->log_debug(
-				"NiFi FlowFile retrieve uuid %s size %d connection %d fail",
-				_uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
-	}
-
-	return ret;
-}
-
-bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) {
-
-	DataStream outStream;
-
-	int ret;
-
-	ret = write(this->_eventTime,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = write(this->_entryDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = write(this->_lineageStartDate,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_uuid,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_uuidConnection,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	// write flow attributes
-	uint32_t numAttributes = this->_attributes.size();
-	ret = write(numAttributes,&outStream);
-	if (ret != 4) {
-
-		return false;
-	}
-
-	for (auto itAttribute : _attributes) {
-		ret = writeUTF(itAttribute.first,&outStream, true);
-		if (ret <= 0) {
-
-			return false;
-		}
-		ret = writeUTF(itAttribute.second,&outStream, true);
-		if (ret <= 0) {
-
-			return false;
-		}
-	}
-
-	ret = writeUTF(this->_contentFullPath,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = write(this->_size,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = write(this->_offset,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	// Persistent to the DB
-
-	if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
-		logger_->log_debug("NiFi FlowFile Store event %s size %d success",
-				_uuid.c_str(), outStream.getSize());
-		return true;
-	} else {
-		logger_->log_error("NiFi FlowFile Store event %s size %d fail",
-				_uuid.c_str(), outStream.getSize());
-		return false;
-	}
-
-	// cleanup
-
-	return true;
-}
-
-bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
-
-	int ret;
-
-	DataStream outStream(buffer,bufferSize);
-
-	ret = read(this->_eventTime,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_entryDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_lineageStartDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = readUTF(this->_uuid,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = readUTF(this->_uuidConnection,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	// read flow attributes
-	uint32_t numAttributes = 0;
-	ret = read(numAttributes,&outStream);
-	if (ret != 4) {
-		return false;
-	}
-
-	for (uint32_t i = 0; i < numAttributes; i++) {
-		std::string key;
-		ret = readUTF(key,&outStream, true);
-		if (ret <= 0) {
-			return false;
-		}
-		std::string value;
-		ret = readUTF(value,&outStream, true);
-		if (ret <= 0) {
-			return false;
-		}
-		this->_attributes[key] = value;
-	}
-
-	ret = readUTF(this->_contentFullPath,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = read(this->_size,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_offset,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	return true;
-}
-
-void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap)
-{
-#ifdef LEVELDB_SUPPORT
-	if (!_enable)
-		return;
-
-	std::vector<std::string> purgeList;
-	leveldb::Iterator* it = _db->NewIterator(
-						leveldb::ReadOptions());
-
-	for (it->SeekToFirst(); it->Valid(); it->Next())
-	{
-		FlowFileEventRecord eventRead;
-		std::string key = it->key().ToString();
-		if (eventRead.DeSerialize((uint8_t *) it->value().data(),
-				(int) it->value().size()))
-		{
-			auto search = connectionMap->find(eventRead.getConnectionUuid());
-			if (search != connectionMap->end())
-			{
-				// we find the connection for the persistent flowfile, create the flowfile and enqueue that
-				FlowFileRecord *record = new FlowFileRecord(&eventRead);
-				// set store to repo to true so that we do need to persistent again in enqueue
-				record->setStoredToRepository(true);
-				search->second->put(record);
-			}
-			else
-			{
-				if (eventRead.getContentFullPath().length() > 0)
-				{
-					std::remove(eventRead.getContentFullPath().c_str());
-				}
-				purgeList.push_back(key);
-			}
-		}
-		else
-		{
-			purgeList.push_back(key);
-		}
-	}
-
-	delete it;
-	std::vector<std::string>::iterator itPurge;
-	for (itPurge = purgeList.begin(); itPurge != purgeList.end();
-						itPurge++)
-	{
-		std::string eventId = *itPurge;
-		logger_->log_info("Repository Repo %s Purge %s",
-										RepositoryTypeStr[_type],
-										eventId.c_str());
-		Delete(eventId);
-	}
-#endif
-
-	return;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GenerateFlowFile.cpp b/libminifi/src/GenerateFlowFile.cpp
deleted file mode 100644
index 12d7f70..0000000
--- a/libminifi/src/GenerateFlowFile.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * @file GenerateFlowFile.cpp
- * GenerateFlowFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include "utils/StringUtils.h"
-
-#include "GenerateFlowFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
-const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
-Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
-Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
-		"If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
-Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
-
-void GenerateFlowFile::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(FileSize);
-	properties.insert(BatchSize);
-	properties.insert(DataFormat);
-	properties.insert(UniqueFlowFiles);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
-}
-
-void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	int64_t batchSize = 1;
-	bool uniqueFlowFile = true;
-	int64_t fileSize = 1024;
-
-	std::string value;
-	if (context->getProperty(FileSize.getName(), value))
-	{
-		Property::StringToInt(value, fileSize);
-	}
-	if (context->getProperty(BatchSize.getName(), value))
-	{
-		Property::StringToInt(value, batchSize);
-	}
-	if (context->getProperty(UniqueFlowFiles.getName(), value))
-	{
-		StringUtils::StringToBool(value, uniqueFlowFile);
-	}
-
-	if (!uniqueFlowFile)
-	{
-		char *data;
-		data = new char[fileSize];
-		if (!data)
-			return;
-		uint64_t dataSize = fileSize;
-		GenerateFlowFile::WriteCallback callback(data, dataSize);
-		char *current = data;
-		for (int i = 0; i < fileSize; i+= sizeof(int))
-		{
-			int randValue = random();
-			*((int *) current) = randValue;
-			current += sizeof(int);
-		}
-		for (int i = 0; i < batchSize; i++)
-		{
-			// For each batch
-			FlowFileRecord *flowFile = session->create();
-			if (!flowFile)
-				return;
-			if (fileSize > 0)
-				session->write(flowFile, &callback);
-			session->transfer(flowFile, Success);
-		}
-		delete[] data;
-	}
-	else
-	{
-		if (!_data)
-		{
-			// We have not create the unique data yet
-			_data = new char[fileSize];
-			_dataSize = fileSize;
-			char *current = _data;
-			for (int i = 0; i < fileSize; i+= sizeof(int))
-			{
-				int randValue = random();
-				*((int *) current) = randValue;
-				// *((int *) current) = (0xFFFFFFFF & i);
-				current += sizeof(int);
-			}
-		}
-		GenerateFlowFile::WriteCallback callback(_data, _dataSize);
-		for (int i = 0; i < batchSize; i++)
-		{
-			// For each batch
-			FlowFileRecord *flowFile = session->create();
-			if (!flowFile)
-				return;
-			if (fileSize > 0)
-				session->write(flowFile, &callback);
-			session->transfer(flowFile, Success);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
deleted file mode 100644
index 40dd387..0000000
--- a/libminifi/src/GetFile.cpp
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * @file GetFile.cpp
- * GetFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-#if  (__GNUC__ >= 4) 
-	#if (__GNUC_MINOR__ < 9)
-		#include <regex.h>
-	#endif
-#endif
-#include "utils/StringUtils.h"
-#include <regex>
-#include "utils/TimeUtil.h"
-#include "GetFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string GetFile::ProcessorName("GetFile");
-Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10");
-Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".");
-Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true");
-Property GetFile::KeepSourceFile("Keep Source File",
-		"If true, the file is not deleted after it has been copied to the Content Repository", "false");
-Property GetFile::MaxAge("Maximum File Age",
-		"The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec");
-Property GetFile::MinAge("Minimum File Age",
-		"The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec");
-Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B");
-Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B");
-Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec");
-Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true");
-Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*");
-Relationship GetFile::Success("success", "All files are routed to success");
-
-void GetFile::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(BatchSize);
-	properties.insert(Directory);
-	properties.insert(IgnoreHiddenFile);
-	properties.insert(KeepSourceFile);
-	properties.insert(MaxAge);
-	properties.insert(MinAge);
-	properties.insert(MaxSize);
-	properties.insert(MinSize);
-	properties.insert(PollInterval);
-	properties.insert(Recurse);
-	properties.insert(FileFilter);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
-}
-
-void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	std::string value;
-
-	logger_->log_info("onTrigger GetFile");
-	if (context->getProperty(Directory.getName(), value))
-	{
-		_directory = value;
-	}
-	if (context->getProperty(BatchSize.getName(), value))
-	{
-		Property::StringToInt(value, _batchSize);
-	}
-	if (context->getProperty(IgnoreHiddenFile.getName(), value))
-	{
-		StringUtils::StringToBool(value, _ignoreHiddenFile);
-	}
-	if (context->getProperty(KeepSourceFile.getName(), value))
-	{
-		StringUtils::StringToBool(value, _keepSourceFile);
-	}
-
-	logger_->log_info("onTrigger GetFile");
-	if (context->getProperty(MaxAge.getName(), value))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(value, _maxAge, unit) &&
-			Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
-		{
-
-		}
-	}
-	if (context->getProperty(MinAge.getName(), value))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(value, _minAge, unit) &&
-			Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
-		{
-
-		}
-	}
-	if (context->getProperty(MaxSize.getName(), value))
-	{
-		Property::StringToInt(value, _maxSize);
-	}
-	if (context->getProperty(MinSize.getName(), value))
-	{
-		Property::StringToInt(value, _minSize);
-	}
-	if (context->getProperty(PollInterval.getName(), value))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(value, _pollInterval, unit) &&
-			Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval))
-		{
-
-		}
-	}
-	if (context->getProperty(Recurse.getName(), value))
-	{
-		StringUtils::StringToBool(value, _recursive);
-	}
-
-	if (context->getProperty(FileFilter.getName(), value))
-	{
-		_fileFilter = value;
-	}
-
-	// Perform directory list
-	logger_->log_info("Is listing empty %i",isListingEmpty());
-	if (isListingEmpty())
-	{
-		if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval)
-		{
-			performListing(_directory);
-		}
-	}
-	logger_->log_info("Is listing empty %i",isListingEmpty());
-
-	if (!isListingEmpty())
-	{
-		try
-		{
-			std::queue<std::string> list;
-			pollListing(list, _batchSize);
-			while (!list.empty())
-			{
-
-				std::string fileName = list.front();
-				list.pop();
-				logger_->log_info("GetFile process %s", fileName.c_str());
-				FlowFileRecord *flowFile = session->create();
-				if (!flowFile)
-					return;
-				std::size_t found = fileName.find_last_of("/\\");
-				std::string path = fileName.substr(0,found);
-				std::string name = fileName.substr(found+1);
-				flowFile->updateAttribute(FILENAME, name);
-				flowFile->updateAttribute(PATH, path);
-				flowFile->addAttribute(ABSOLUTE_PATH, fileName);
-				session->import(fileName, flowFile, _keepSourceFile);
-				session->transfer(flowFile, Success);
-			}
-		}
-		catch (std::exception &exception)
-		{
-			logger_->log_debug("GetFile Caught Exception %s", exception.what());
-			throw;
-		}
-		catch (...)
-		{
-			throw;
-		}
-	}
-
-}
-
-bool GetFile::isListingEmpty()
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	return _dirList.empty();
-}
-
-void GetFile::putListing(std::string fileName)
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_dirList.push(fileName);
-}
-
-void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
-	{
-		std::string fileName = _dirList.front();
-		_dirList.pop();
-		list.push(fileName);
-	}
-
-	return;
-}
-
-bool GetFile::acceptFile(std::string fullName, std::string name)
-{
-	struct stat statbuf;
-
-	if (stat(fullName.c_str(), &statbuf) == 0)
-	{
-		if (_minSize > 0 && statbuf.st_size <_minSize)
-			return false;
-
-		if (_maxSize > 0 && statbuf.st_size > _maxSize)
-			return false;
-
-		uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-		uint64_t fileAge = getTimeMillis() - modifiedTime;
-		if (_minAge > 0 && fileAge < _minAge)
-			return false;
-		if (_maxAge > 0 && fileAge > _maxAge)
-			return false;
-
-		if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
-			return false;
-
-		if (access(fullName.c_str(), R_OK) != 0)
-			return false;
-
-		if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
-			return false;
-
-
-	#ifdef __GNUC__ 
-		#if (__GNUC__ >= 4)
-			#if (__GNUC_MINOR__ < 9)
-				regex_t regex;
-				int ret = regcomp(&regex, _fileFilter.c_str(),0);
-				if (ret)
-					return false;
-				ret = regexec(&regex,name.c_str(),(size_t)0,NULL,0);
-				regfree(&regex);
-				if (ret)
-					return false;	
-			#else
-				try{
-					std::regex re(_fileFilter);
-	
-					if (!std::regex_match(name, re)) {
-						return false;
-   					}
-				} catch (std::regex_error e) {
-					logger_->log_error("Invalid File Filter regex: %s.", e.what());
-					return false;
-				}
-			#endif
-		#endif
-		#else
-			logger_->log_info("Cannot support regex filtering");
-		#endif
-		return true;
-	}
-
-	return false;
-}
-
-void GetFile::performListing(std::string dir)
-{
-	logger_->log_info("Performing file listing against %s",dir.c_str());
-	DIR *d;
-	d = opendir(dir.c_str());
-	if (!d)
-		return;
-	// only perform a listing while we are not empty
-	logger_->log_info("Performing file listing against %s",dir.c_str());
-	while (isRunning())
-	{
-		struct dirent *entry;
-		entry = readdir(d);
-		if (!entry)
-			break;
-		std::string d_name = entry->d_name;
-		if ((entry->d_type & DT_DIR))
-		{
-			// if this is a directory
-			if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0)
-			{
-				std::string path = dir + "/" + d_name;
-				performListing(path);
-			}
-		}
-		else
-		{
-			std::string fileName = dir + "/" + d_name;
-			if (acceptFile(fileName, d_name))
-			{
-				// check whether we can take this file
-				putListing(fileName);
-			}
-		}
-	}
-	closedir(d);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenHTTP.cpp b/libminifi/src/ListenHTTP.cpp
deleted file mode 100644
index 89ce1d2..0000000
--- a/libminifi/src/ListenHTTP.cpp
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * @file ListenHTTP.cpp
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <uuid/uuid.h>
-
-#include <CivetServer.h>
-
-#include "ListenHTTP.h"
-
-#include "utils/TimeUtil.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-const std::string ListenHTTP::ProcessorName("ListenHTTP");
-
-Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
-Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
-Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.", ".*");
-Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
-Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
-Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
-Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes", "");
-
-Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize()
-{
-	_logger->log_info("Initializing ListenHTTP");
-
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(BasePath);
-	properties.insert(Port);
-	properties.insert(AuthorizedDNPattern);
-	properties.insert(SSLCertificate);
-	properties.insert(SSLCertificateAuthority);
-	properties.insert(SSLVerifyPeer);
-	properties.insert(SSLMinimumVersion);
-	properties.insert(HeadersAsAttributesRegex);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory)
-{
-
-	std::string basePath;
-
-	if (!context->getProperty(BasePath.getName(), basePath))
-	{
-		_logger->log_info("%s attribute is missing, so default value of %s will be used",
-				BasePath.getName().c_str(),
-				BasePath.getValue().c_str());
-		basePath = BasePath.getValue();
-	}
-
-	basePath.insert(0, "/");
-
-	std::string listeningPort;
-
-	if (!context->getProperty(Port.getName(), listeningPort))
-	{
-		_logger->log_error("%s attribute is missing or invalid",
-				Port.getName().c_str());
-		return;
-	}
-
-	std::string authDNPattern;
-
-	if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty())
-	{
-		_logger->log_info("ListenHTTP using %s: %s",
-				AuthorizedDNPattern.getName().c_str(),
-				authDNPattern.c_str());
-	}
-
-	std::string sslCertFile;
-
-	if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty())
-	{
-		_logger->log_info("ListenHTTP using %s: %s",
-				SSLCertificate.getName().c_str(),
-				sslCertFile.c_str());
-	}
-
-	// Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
-	std::string sslCertAuthorityFile;
-	std::string sslVerifyPeer;
-	std::string sslMinVer;
-
-	if (!sslCertFile.empty())
-	{
-		if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile)
-				&& !sslCertAuthorityFile.empty())
-		{
-			_logger->log_info("ListenHTTP using %s: %s",
-					SSLCertificateAuthority.getName().c_str(),
-					sslCertAuthorityFile.c_str());
-		}
-
-		if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer))
-		{
-			if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0)
-			{
-				_logger->log_info("ListenHTTP will not verify peers");
-			}
-			else
-			{
-				_logger->log_info("ListenHTTP will verify peers");
-			}
-		}
-		else
-		{
-			_logger->log_info("ListenHTTP will not verify peers");
-		}
-
-		if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer))
-		{
-			_logger->log_info("ListenHTTP using %s: %s",
-					SSLMinimumVersion.getName().c_str(),
-					sslMinVer.c_str());
-		}
-	}
-
-	std::string headersAsAttributesPattern;
-
-	if (context->getProperty(HeadersAsAttributesRegex.getName(),headersAsAttributesPattern)
-			&& !headersAsAttributesPattern.empty())
-	{
-		_logger->log_info("ListenHTTP using %s: %s",
-				HeadersAsAttributesRegex.getName().c_str(),
-				headersAsAttributesPattern.c_str());
-	}
-
-	auto numThreads = getMaxConcurrentTasks();
-
-	_logger->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads",
-			listeningPort.c_str(),
-			basePath.c_str(),
-			numThreads);
-
-	// Initialize web server
-	std::vector<std::string> options;
-	options.push_back("enable_keep_alive");
-	options.push_back("yes");
-	options.push_back("keep_alive_timeout_ms");
-	options.push_back("15000");
-	options.push_back("num_threads");
-	options.push_back(std::to_string(numThreads));
-
-	if (sslCertFile.empty())
-	{
-		options.push_back("listening_ports");
-		options.push_back(listeningPort);
-	}
-	else
-	{
-		listeningPort += "s";
-		options.push_back("listening_ports");
-		options.push_back(listeningPort);
-
-		options.push_back("ssl_certificate");
-		options.push_back(sslCertFile);
-
-		if (!sslCertAuthorityFile.empty())
-		{
-			options.push_back("ssl_ca_file");
-			options.push_back(sslCertAuthorityFile);
-		}
-
-		if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0)
-		{
-			options.push_back("ssl_verify_peer");
-			options.push_back("no");
-		}
-		else
-		{
-			options.push_back("ssl_verify_peer");
-			options.push_back("yes");
-		}
-
-		if (sslMinVer.compare("SSL2") == 0)
-		{
-			options.push_back("ssl_protocol_version");
-			options.push_back(std::to_string(0));
-		}
-		else if (sslMinVer.compare("SSL3") == 0)
-		{
-			options.push_back("ssl_protocol_version");
-			options.push_back(std::to_string(1));
-		}
-		else if (sslMinVer.compare("TLS1.0") == 0)
-		{
-			options.push_back("ssl_protocol_version");
-			options.push_back(std::to_string(2));
-		}
-		else if (sslMinVer.compare("TLS1.1") == 0)
-		{
-			options.push_back("ssl_protocol_version");
-			options.push_back(std::to_string(3));
-		}
-		else
-		{
-			options.push_back("ssl_protocol_version");
-			options.push_back(std::to_string(4));
-		}
-	}
-
-	_server.reset(new CivetServer(options));
-	_handler.reset(new Handler(context,
-			sessionFactory,
-			std::move(authDNPattern),
-			std::move(headersAsAttributesPattern)));
-	_server->addHandler(basePath, _handler.get());
-}
-
-void ListenHTTP::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-
-	FlowFileRecord *flowFile = session->get();
-
-	// Do nothing if there are no incoming files
-	if (!flowFile)
-	{
-		return;
-	}
-}
-
-ListenHTTP::Handler::Handler(ProcessContext *context,
-		ProcessSessionFactory *sessionFactory,
-		std::string &&authDNPattern,
-		std::string &&headersAsAttributesPattern)
-: _authDNRegex(std::move(authDNPattern))
-, _headersAsAttributesRegex(std::move(headersAsAttributesPattern))
-{
-	_processContext = context;
-	_processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn)
-{
-	mg_printf(conn,
-			"HTTP/1.1 500 Internal Server Error\r\n"
-			"Content-Type: text/html\r\n"
-			"Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn)
-{
-	_logger = Logger::getLogger();
-
-	auto req_info = mg_get_request_info(conn);
-	_logger->log_info("ListenHTTP handling POST request of length %d", req_info->content_length);
-
-	// If this is a two-way TLS connection, authorize the peer against the configured pattern
-	if (req_info->is_ssl && req_info->client_cert != nullptr)
-	{
-		if (!std::regex_match(req_info->client_cert->subject, _authDNRegex))
-		{
-			mg_printf(conn,
-					"HTTP/1.1 403 Forbidden\r\n"
-					"Content-Type: text/html\r\n"
-					"Content-Length: 0\r\n\r\n");
-			_logger->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
-			return true;
-		}
-	}
-
-	// Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
-	mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
-	auto session = _processSessionFactory->createSession();
-	ListenHTTP::WriteCallback callback(conn, req_info);
-	auto flowFile = session->create();
-
-	if (!flowFile)
-	{
-		sendErrorResponse(conn);
-		return true;
-	}
-
-	try
-	{
-		session->write(flowFile, &callback);
-
-		// Add filename from "filename" header value (and pattern headers)
-		for (int i = 0; i < req_info->num_headers; i++)
-		{
-			auto header = &req_info->http_headers[i];
-
-			if (strcmp("filename", header->name) == 0)
-			{
-				if (!flowFile->updateAttribute("filename", header->value))
-				{
-					flowFile->addAttribute("filename", header->value);
-				}
-			}
-			else if (std::regex_match(header->name, _headersAsAttributesRegex))
-			{
-				if (!flowFile->updateAttribute(header->name, header->value))
-				{
-					flowFile->addAttribute(header->name, header->value);
-				}
-			}
-		}
-
-		session->transfer(flowFile, Success);
-		session->commit();
-	}
-	catch (std::exception &exception)
-	{
-		_logger->log_debug("ListenHTTP Caught Exception %s", exception.what());
-		sendErrorResponse(conn);
-		session->rollback();
-		throw;
-	}
-	catch (...)
-	{
-		_logger->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
-		sendErrorResponse(conn);
-		session->rollback();
-		throw;
-	}
-
-	mg_printf(conn,
-			"HTTP/1.1 200 OK\r\n"
-			"Content-Type: text/html\r\n"
-			"Content-Length: 0\r\n\r\n");
-
-	return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
-{
-	_logger = Logger::getLogger();
-	_conn = conn;
-	_reqInfo = reqInfo;
-}
-
-void ListenHTTP::WriteCallback::process(std::ofstream *stream)
-{
-	long long rlen;
-	long long nlen = 0;
-	long long tlen = _reqInfo->content_length;
-	char buf[16384];
-
-	while (nlen < tlen)
-	{
-		rlen = tlen - nlen;
-
-		if (rlen > sizeof(buf))
-		{
-			rlen = sizeof(buf);
-		}
-
-		// Read a buffer of data from client
-		rlen = mg_read(_conn, &buf[0], (size_t)rlen);
-
-		if (rlen <= 0)
-		{
-			break;
-		}
-
-		// Transfer buffer data to the output stream
-		stream->write(&buf[0], rlen);
-
-		nlen += rlen;
-	}
-}