You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/09 15:15:45 UTC

nifi-minifi-cpp git commit: MINIFI-231: Add Flow Persistent, Using id instead of name to load the flow from YAML

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 60f02126e -> c4fea0cd9


MINIFI-231: Add Flow Persistent, Using id instead of name to load the flow from YAML

This closes #62.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c4fea0cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c4fea0cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c4fea0cd

Branch: refs/heads/master
Commit: c4fea0cd94f9ea2ae94303ff5b574c0c00d6745a
Parents: 60f0212
Author: Bin Qiu <be...@gmail.com>
Authored: Wed Mar 1 20:45:16 2017 -0800
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Mar 9 10:12:39 2017 -0500

----------------------------------------------------------------------
 README.md                                  |   8 +-
 conf/minifi.properties                     |   5 +
 libminifi/include/Configure.h              |   7 +-
 libminifi/include/Connection.h             |   7 +-
 libminifi/include/FlowController.h         |  14 +-
 libminifi/include/FlowFileRecord.h         |  18 +-
 libminifi/include/FlowFileRepository.h     | 204 ++++++++++++++++
 libminifi/include/ProcessGroup.h           |   2 +
 libminifi/include/ProcessSession.h         |   7 +-
 libminifi/include/Provenance.h             | 164 +------------
 libminifi/include/Repository.h             | 294 ++++++++++++++++++++++++
 libminifi/include/ResourceClaim.h          |   5 +
 libminifi/src/Configure.cpp                |   7 +-
 libminifi/src/Connection.cpp               |  42 ++++
 libminifi/src/FlowController.cpp           |  89 ++++---
 libminifi/src/FlowFileRecord.cpp           |  53 ++++-
 libminifi/src/FlowFileRepository.cpp       | 280 ++++++++++++++++++++++
 libminifi/src/ProcessGroup.cpp             |  12 +
 libminifi/src/ProcessSession.cpp           |  48 +++-
 libminifi/src/Provenance.cpp               |  72 +-----
 libminifi/src/Repository.cpp               | 138 +++++++++++
 libminifi/src/ResourceClaim.cpp            |   2 +-
 libminifi/src/io/ClientSocket.cpp          |   2 +-
 libminifi/test/unit/ProcessorTests.cpp     |  44 +++-
 libminifi/test/unit/ProvenanceTestHelper.h |  61 ++++-
 25 files changed, 1281 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 64f2377..b254b32 100644
--- a/README.md
+++ b/README.md
@@ -224,10 +224,12 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
 
 
     Flow Controller:
+        id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
         name: MiNiFi Flow
 
     Processors:
         - name: GetFile
+          id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
           class: org.apache.nifi.processors.standard.GetFile
           max concurrent tasks: 1
           scheduling strategy: TIMER_DRIVEN
@@ -242,15 +244,17 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
 
     Connections:
         - name: TransferFilesToRPG
-          source name: GetFile
+          id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+          source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
           source relationship name: success
-          destination name: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+          destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
           max work queue size: 0
           max work queue data size: 1 MB
           flowfile expiration: 60 sec
 
     Remote Processing Groups:
         - name: NiFi Flow
+          id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
           url: http://localhost:8080/nifi
           timeout: 30 secs
           yield period: 10 sec

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 62114dc..cfa2858 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -23,6 +23,11 @@ nifi.bored.yield.duration=10 millis
 nifi.provenance.repository.directory.default=./provenance_repository
 nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
+# FlowFileRepository #
+nifi.flowfile.repository.enable=true
+nifi.flowfile.repository.directory.default=./flowfile_repository
+nifi.flowfile.repository.max.storage.time=10 MIN
+nifi.flowfile.repository.max.storage.size=1 MB
 
 # Security Related Properties #
 # Enable tls ssl

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h
index 9bb7a0e..6f0d198 100644
--- a/libminifi/include/Configure.h
+++ b/libminifi/include/Configure.h
@@ -27,7 +27,6 @@
 #include <errno.h>
 #include <iostream>
 #include <fstream>
-
 #include "Logger.h"
 
 class Configure {
@@ -46,12 +45,18 @@ public:
 	static const char *nifi_administrative_yield_duration;
 	static const char *nifi_bored_yield_duration;
 	static const char *nifi_graceful_shutdown_seconds;
+	static const char *nifi_log_level;
 	static const char *nifi_server_name;
 	static const char *nifi_server_port;
 	static const char *nifi_server_report_interval;
 	static const char *nifi_provenance_repository_max_storage_time;
 	static const char *nifi_provenance_repository_max_storage_size;
 	static const char *nifi_provenance_repository_directory_default;
+	static const char *nifi_provenance_repository_enable;
+	static const char *nifi_flowfile_repository_max_storage_time;
+	static const char *nifi_flowfile_repository_max_storage_size;
+	static const char *nifi_flowfile_repository_directory_default;
+	static const char *nifi_flowfile_repository_enable;
 	static const char *nifi_remote_input_secure;
 	static const char *nifi_security_need_ClientAuth;
 	static const char *nifi_security_client_certificate;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 0868560..5af0d2f 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -84,6 +84,10 @@ public:
 		else
 			return false;
 	}
+	//! Get UUID Str
+	std::string getUUIDStr() {
+		return _uuidStr;
+	}
 	//! Set Connection Source Processor
 	void setSourceProcessor(Processor *source) {
 		_srcProcessor = source;
@@ -180,7 +184,8 @@ protected:
 	std::atomic<uint64_t> _maxQueueDataSize;
 	//! Flow File Expiration Duration in= MilliSeconds
 	std::atomic<uint64_t> _expiredDuration;
-
+	//! UUID string
+	std::string _uuidStr;
 
 private:
 	//! Mutex for protection

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d60f022..f63029e 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -47,6 +47,7 @@
 #include "FlowControlProtocol.h"
 #include "RemoteProcessorGroupPort.h"
 #include "Provenance.h"
+#include "FlowFileRepository.h"
 #include "GetFile.h"
 #include "PutFile.h"
 #include "TailFile.h"
@@ -64,6 +65,7 @@
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
 
 struct ProcessorConfig {
+	std::string id;
 	std::string name;
 	std::string javaClass;
 	std::string maxConcurrentTasks;
@@ -129,6 +131,10 @@ public:
 	virtual ProvenanceRepository *getProvenanceRepository() {
 		return this->_provenanceRepo;
 	}
+	//! Get the flowfile repository
+	virtual FlowFileRepository *getFlowFileRepository() {
+		return this->_flowfileRepo;
+	}
 	//! Load flow xml from disk, after that, create the root process group and its children, initialize the flows
 	virtual void load() = 0;
 
@@ -172,7 +178,6 @@ public:
 		_protocol->setSerialNumber(number);
 	}
 
-
 protected:
   
 	//! A global unique identifier
@@ -197,6 +202,8 @@ protected:
 	std::atomic<bool> _initialized;
 	//! Provenance Repo
 	ProvenanceRepository *_provenanceRepo;
+	//! FlowFile Repo
+	FlowFileRepository *_flowfileRepo;
 	//! Flow Engines
 	//! Flow Timer Scheduler
 	TimerDrivenSchedulingAgent _timerScheduler;
@@ -212,7 +219,7 @@ protected:
 
 	FlowController() :
 			_root(0), _maxTimerDrivenThreads(0), _maxEventDrivenThreads(0), _running(
-					false), _initialized(false), _provenanceRepo(0), _protocol(
+					false), _initialized(false), _provenanceRepo(0), _flowfileRepo(0), _protocol(
 					0), logger_(Logger::getLogger()){
 	}
 
@@ -246,6 +253,8 @@ public:
 	void unload();
 	//! Load new xml
 	void reload(std::string yamlFile);
+	//! Load Flow File from persistent Flow Repo
+	void loadFlowRepo();
 	//! update property value
 	void updatePropertyValue(std::string processorName,
 			std::string propertyName, std::string propertyValue) {
@@ -283,6 +292,7 @@ private:
 	//! Logger
 	std::shared_ptr<Logger> logger_;
 	Configure *configure_;
+
 	//! Process Processor Node YAML
 	void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent);
 	//! Process Port YAML

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index a3b87ee..ded0623 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -37,6 +37,7 @@
 
 class ProcessSession;
 class Connection;
+class FlowFileEventRecord;
 
 #define DEFAULT_FLOWFILE_PATH "."
 
@@ -107,7 +108,11 @@ public:
 	/*!
 	 * Create a new flow record
 	 */
-	FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
+	explicit FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
+	/*!
+	 * Create a new flow record from repo flow event
+	 */
+	explicit FlowFileRecord(FlowFileEventRecord *event);
 	//! Destructor
 	virtual ~FlowFileRecord();
 	//! addAttribute key is enum
@@ -175,6 +180,15 @@ public:
 	{
 		return _lineageIdentifiers;
 	}
+	//! Check whether it is stored to DB already
+	bool isStoredToRepository()
+	{
+		return _isStoredToRepo;
+	}
+	void setStoredToRepository(bool value)
+	{
+		_isStoredToRepo = value;
+	}
 
 protected:
 
@@ -202,6 +216,8 @@ protected:
 	std::string _uuidStr;
 	//! UUID string for all parents
 	std::set<std::string> _lineageIdentifiers;
+	//! whether it is stored to DB
+	bool _isStoredToRepo;
 	//! duplicate the original flow file
 	void duplicate(FlowFileRecord *original);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRepository.h b/libminifi/include/FlowFileRepository.h
new file mode 100644
index 0000000..50d2c41
--- /dev/null
+++ b/libminifi/include/FlowFileRepository.h
@@ -0,0 +1,204 @@
+/**
+ * @file FlowFileRepository 
+ * Flow file repository class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOWFILE_REPOSITORY_H__
+#define __FLOWFILE_REPOSITORY_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "Configure.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+#include "Logger.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+#include "Repository.h"
+
+#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
+#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+
+//! FlowFile Repository
+class FlowFileRepository : public Repository
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new provenance repository
+	 */
+	FlowFileRepository()
+	 : Repository(Repository::FLOWFILE, FLOWFILE_REPOSITORY_DIRECTORY,
+			MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD)
+	{
+	}
+	//! Destructor
+	virtual ~FlowFileRepository() {
+	}
+	//! Load Repo to Connections
+	void loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap);
+
+protected:
+
+private:
+
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	FlowFileRepository(const FlowFileRepository &parent);
+	FlowFileRepository &operator=(const FlowFileRepository &parent);
+};
+
+//! FlowFile Event Record
+class FlowFileEventRecord : protected Serializable
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new provenance event record
+	 */
+	FlowFileEventRecord()
+	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
+	{
+		_eventTime = getTimeMillis();
+		logger_ = Logger::getLogger();
+	}
+
+	//! Destructor
+	virtual ~FlowFileEventRecord() {
+	}
+	//! Get Attributes
+	std::map<std::string, std::string> getAttributes() {
+		return _attributes;
+	}
+	//! Get Size
+	uint64_t getFileSize() {
+		return _size;
+	}
+	// ! Get Offset
+	uint64_t getFileOffset() {
+		return _offset;
+	}
+	// ! Get Entry Date
+	uint64_t getFlowFileEntryDate() {
+		return _entryDate;
+	}
+	// ! Get Lineage Start Date
+	uint64_t getlineageStartDate() {
+		return _lineageStartDate;
+	}
+	// ! Get Event Time
+	uint64_t getEventTime() {
+		return _eventTime;
+	}
+	//! Get FlowFileUuid
+	std::string getFlowFileUuid()
+	{
+		return _uuid;
+	}
+	//! Get ConnectionUuid
+	std::string getConnectionUuid()
+	{
+		return _uuidConnection;
+	}
+	//! Get content full path
+	std::string getContentFullPath()
+	{
+		return _contentFullPath;
+	}
+	//! Get LineageIdentifiers
+	std::set<std::string> getLineageIdentifiers()
+	{
+		return _lineageIdentifiers;
+	}
+	//! fromFlowFile
+	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
+	{
+		_entryDate = flow->getEntryDate();
+		_lineageStartDate = flow->getlineageStartDate();
+		_lineageIdentifiers = flow->getlineageIdentifiers();
+		_uuid = flow->getUUIDStr();
+		_attributes = flow->getAttributes();
+		_size = flow->getSize();
+		_offset = flow->getOffset();
+		_uuidConnection = uuidConnection;
+		if (flow->getResourceClaim())
+		{
+			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
+		}
+	}
+	//! Serialize and Persistent to the repository
+	bool Serialize(FlowFileRepository *repo);
+	//! DeSerialize
+	bool DeSerialize(const uint8_t *buffer, const int bufferSize);
+	//! DeSerialize
+	bool DeSerialize(DataStream &stream)
+	{
+		return DeSerialize(stream.getBuffer(),stream.getSize());
+	}
+	//! DeSerialize
+	bool DeSerialize(FlowFileRepository *repo, std::string key);
+
+protected:
+
+	//! Date at which the event was created
+	uint64_t _eventTime;
+	//! Date at which the flow file entered the flow
+	uint64_t _entryDate;
+	//! Date at which the origin of this flow file entered the flow
+	uint64_t _lineageStartDate;
+	//! Size in bytes of the data corresponding to this flow file
+	uint64_t _size;
+	//! flow uuid
+	std::string _uuid;
+	//! connection uuid
+	std::string _uuidConnection;
+	//! Offset to the content
+	uint64_t _offset;
+	//! Full path to the content
+	std::string _contentFullPath;
+	//! Attributes key/values pairs for the flow record
+	std::map<std::string, std::string> _attributes;
+	//! UUID string for all parents
+	std::set<std::string> _lineageIdentifiers;
+
+private:
+
+	//! Logger
+	std::shared_ptr<Logger> logger_;
+	
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	FlowFileEventRecord(const FlowFileEventRecord &parent);
+	FlowFileEventRecord &operator=(const FlowFileEventRecord &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
index dbff7b0..dfec6c5 100644
--- a/libminifi/include/ProcessGroup.h
+++ b/libminifi/include/ProcessGroup.h
@@ -147,6 +147,8 @@ public:
 	void removeConnection(Connection *connection);
 	//! update property value
 	void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
+	//! get connections under the process group
+	void getConnections(std::map<std::string, Connection*> *connectionMap);
 
 protected:
 	//! A global unique identifier

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessSession.h b/libminifi/include/ProcessSession.h
index 6f01506..4e26758 100644
--- a/libminifi/include/ProcessSession.h
+++ b/libminifi/include/ProcessSession.h
@@ -44,12 +44,7 @@ public:
 	/*!
 	 * Create a new process session
 	 */
-	ProcessSession(ProcessContext *processContext = NULL) : _processContext(processContext) {
-		logger_ = Logger::getLogger();
-		logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
-		_provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
-				_processContext->getProcessor()->getName());
-	}
+	ProcessSession(ProcessContext *processContext = NULL);
 	//! Destructor
 	virtual ~ProcessSession() {
 		if (_provenanceReport)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h
index 52489fc..3ba9792 100644
--- a/libminifi/include/Provenance.h
+++ b/libminifi/include/Provenance.h
@@ -32,10 +32,6 @@
 #include <thread>
 #include <vector>
 
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/slice.h"
-#include "leveldb/status.h"
 #include "Configure.h"
 #include "Connection.h"
 #include "FlowFileRecord.h"
@@ -44,9 +40,7 @@
 #include "ResourceClaim.h"
 #include "io/Serializable.h"
 #include "utils/TimeUtil.h"
-
-// Provenance Event Record Serialization Seg Size
-#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
+#include "Repository.h"
 
 class ProvenanceRepository;
 
@@ -569,107 +563,23 @@ private:
 #define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
 
 //! Provenance Repository
-class ProvenanceRepository
+class ProvenanceRepository : public Repository
 {
 public:
 	//! Constructor
 	/*!
 	 * Create a new provenance repository
 	 */
-	ProvenanceRepository() {
-		logger_ = Logger::getLogger();
-		configure_ = Configure::getConfigure();
-		_directory = PROVENANCE_DIRECTORY;
-		_maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME;
-		_purgePeriod = PROVENANCE_PURGE_PERIOD;
-		_maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE;
-		_db = NULL;
-		_thread = NULL;
-		_running = false;
-		_repoFull = false;
+	ProvenanceRepository()
+	 : Repository(Repository::PROVENANCE, PROVENANCE_DIRECTORY,
+			MAX_PROVENANCE_ENTRY_LIFE_TIME, MAX_PROVENANCE_STORAGE_SIZE, PROVENANCE_PURGE_PERIOD)
+	{
 	}
 
 	//! Destructor
 	virtual ~ProvenanceRepository() {
-		stop();
-		if (this->_thread)
-			delete this->_thread;
-		destroy();
 	}
 
-	//! initialize
-	virtual bool initialize()
-	{
-		std::string value;
-		if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
-		{
-			_directory = value;
-		}
-		logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
-		if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
-		{
-			Property::StringToInt(value, _maxPartitionBytes);
-		}
-		logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
-		if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
-		{
-			TimeUnit unit;
-			if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
-						Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
-			{
-			}
-		}
-		logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
-		leveldb::Options options;
-		options.create_if_missing = true;
-		leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
-		if (status.ok())
-		{
-			logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
-		}
-		else
-		{
-			logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
-			return false;
-		}
-
-		// start the monitor thread
-		start();
-		return true;
-	}
-	//! Put
-	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
-	{
-	  
-		// persistent to the DB
-		leveldb::Slice value((const char *) buf, bufLen);
-		leveldb::Status status;
-		status = _db->Put(leveldb::WriteOptions(), key, value);
-		if (status.ok())
-			return true;
-		else
-			return false;
-	}
-	//! Delete
-	virtual bool Delete(std::string key)
-	{
-		leveldb::Status status;
-		status = _db->Delete(leveldb::WriteOptions(), key);
-		if (status.ok())
-			return true;
-		else
-			return false;
-	}
-	//! Get
-	virtual bool Get(std::string key, std::string &value)
-	{
-		leveldb::Status status;
-		status = _db->Get(leveldb::ReadOptions(), key, &value);
-		if (status.ok())
-			return true;
-		else
-			return false;
-	}
 	//! Persistent event
 	void registerEvent(ProvenanceEventRecord *event)
 	{
@@ -680,77 +590,15 @@ public:
 	{
 		Delete(event->getEventId());
 	}
-	//! destroy
-	void destroy()
-	{
-		if (_db)
-		{
-			delete _db;
-			_db = NULL;
-		}
-	}
-	//! Run function for the thread
-	static void run(ProvenanceRepository *repo);
-	//! Start the repository monitor thread
-	virtual void start();
-	//! Stop the repository monitor thread
-	virtual void stop();
-	//! whether the repo is full
-	virtual bool isFull()
-	{
-		return _repoFull;
-	}
 
 protected:
 
 private:
 
-	//! Mutex for protection
-	std::mutex _mtx;
-	//! repository directory
-	std::string _directory;
-	//! Logger
-	std::shared_ptr<Logger> logger_;
-	//! Configure
-	//! max db entry life time
-	Configure *configure_;
-	int64_t _maxPartitionMillis;
-	//! max db size
-	int64_t _maxPartitionBytes;
-	//! purge period
-	uint64_t _purgePeriod;
-	//! level DB database
-	leveldb::DB* _db;
-	//! thread
-	std::thread *_thread;
-	//! whether it is running
-	bool _running;
-	//! whether stop accepting provenace event
-	std::atomic<bool> _repoFull;
-	//! size of the directory
-	static uint64_t _repoSize;
-	//! call back for directory size
-	static int repoSum(const char *fpath, const struct stat *sb, int typeflag)
-	{
-	    _repoSize += sb->st_size;
-	    return 0;
-	}
-	//! repoSize
-	uint64_t repoSize()
-	{
-		_repoSize = 0;
-		if (ftw(_directory.c_str(), repoSum, 1) != 0)
-			_repoSize = 0;
-
-		return _repoSize;
-	}
 	// Prevent default copy constructor and assignment operation
 	// Only support pass by reference or pointer
 	ProvenanceRepository(const ProvenanceRepository &parent);
 	ProvenanceRepository &operator=(const ProvenanceRepository &parent);
 };
 
-
-
-
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Repository.h b/libminifi/include/Repository.h
new file mode 100644
index 0000000..8b1c3ec
--- /dev/null
+++ b/libminifi/include/Repository.h
@@ -0,0 +1,294 @@
+/**
+ * @file Repository 
+ * Repository class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REPOSITORY_H__
+#define __REPOSITORY_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
+#include "Configure.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+#include "Logger.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+
+//! Repository
+class Repository
+{
+public:
+	enum RepositoryType {
+		//! Provenance Repo Type
+		PROVENANCE,
+		//! FlowFile Repo Type
+		FLOWFILE,
+		MAX_REPO_TYPE
+	};
+	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
+	//! Constructor
+	/*!
+	 * Create a new provenance repository
+	 */
+	Repository(RepositoryType type, std::string directory, 
+		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
+		_type = type;
+		_directory = directory;
+		_maxPartitionMillis = maxPartitionMillis;
+		_maxPartitionBytes = maxPartitionBytes;
+		_purgePeriod = purgePeriod;
+		logger_ = Logger::getLogger();
+		configure_ = Configure::getConfigure();
+		_db = NULL;
+		_thread = NULL;
+		_running = false;
+		_repoFull = false;
+		_enable = true;
+	}
+
+	//! Destructor
+	virtual ~Repository() {
+		stop();
+		if (this->_thread)
+			delete this->_thread;
+		destroy();
+	}
+
+	//! initialize
+	virtual bool initialize()
+	{
+		std::string value;
+
+		if (_type == PROVENANCE)
+		{
+			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
+					&& StringUtils::StringToBool(value, _enable))) {
+				_enable = true;
+			}
+			if (!_enable)
+				return false;
+			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
+			{
+				_directory = value;
+			}
+			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
+			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
+			{
+				Property::StringToInt(value, _maxPartitionBytes);
+			}
+			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
+			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
+			{
+				TimeUnit unit;
+				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
+							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+				{
+				}
+			}
+			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
+			leveldb::Options options;
+			options.create_if_missing = true;
+			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
+			if (status.ok())
+			{
+				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
+			}
+			else
+			{
+				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
+				return false;
+			}
+		}
+
+		if (_type == FLOWFILE)
+		{
+			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
+					&& StringUtils::StringToBool(value, _enable))) {
+				_enable = true;
+			}
+			if (!_enable)
+				return false;
+			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
+			{
+				_directory = value;
+			}
+			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
+			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
+			{
+				Property::StringToInt(value, _maxPartitionBytes);
+			}
+			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
+			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
+			{
+				TimeUnit unit;
+				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
+							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+				{
+				}
+			}
+			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
+			leveldb::Options options;
+			options.create_if_missing = true;
+			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
+			if (status.ok())
+			{
+				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
+			}
+			else
+			{
+				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
+				return false;
+			}
+		}
+
+		return true;
+	}
+	//! Put
+	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
+	{
+		if (!_enable)
+			return false;
+			
+		// persistent to the DB
+		leveldb::Slice value((const char *) buf, bufLen);
+		leveldb::Status status;
+		status = _db->Put(leveldb::WriteOptions(), key, value);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Delete
+	virtual bool Delete(std::string key)
+	{
+		if (!_enable)
+			return false;
+		leveldb::Status status;
+		status = _db->Delete(leveldb::WriteOptions(), key);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Get
+	virtual bool Get(std::string key, std::string &value)
+	{
+		if (!_enable)
+			return false;
+		leveldb::Status status;
+		status = _db->Get(leveldb::ReadOptions(), key, &value);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Run function for the thread
+	static void run(Repository *repo);
+	//! Start the repository monitor thread
+	virtual void start();
+	//! Stop the repository monitor thread
+	virtual void stop();
+	//! whether the repo is full
+	virtual bool isFull()
+	{
+		return _repoFull;
+	}
+	//! whether the repo is enable 
+	virtual bool isEnable()
+	{
+		return _enable;
+	}
+
+protected:
+	//! Repo Type
+	RepositoryType _type;
+	//! Mutex for protection
+	std::mutex _mtx;
+	//! repository directory
+	std::string _directory;
+	//! Logger
+	std::shared_ptr<Logger> logger_;
+	//! Configure
+	//! max db entry life time
+	Configure *configure_;
+	int64_t _maxPartitionMillis;
+	//! max db size
+	int64_t _maxPartitionBytes;
+	//! purge period
+	uint64_t _purgePeriod;
+	//! level DB database
+	leveldb::DB* _db;
+	//! thread
+	std::thread *_thread;
+	//! whether the monitoring thread is running for the repo while it was enabled 
+	bool _running;
+	//! whether it is enabled by minfi property for the repo 
+	bool _enable;
+	//! whether stop accepting provenace event
+	std::atomic<bool> _repoFull;
+	//! repoSize
+	uint64_t repoSize();
+	//! size of the directory
+	static uint64_t _repoSize[MAX_REPO_TYPE];
+	//! call back for directory size
+	static int repoSumProvenance(const char *fpath, const struct stat *sb, int typeflag)
+	{
+		_repoSize[PROVENANCE] += sb->st_size;
+		return 0;
+	}
+	//! call back for directory size
+	static int repoSumFlowFile(const char *fpath, const struct stat *sb, int typeflag)
+	{
+		_repoSize[FLOWFILE] += sb->st_size;
+		return 0;
+	}
+
+private:
+	//! destroy
+	void destroy()
+	{
+		if (_db)
+		{
+			delete _db;
+			_db = NULL;
+		}
+	}
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	Repository(const Repository &parent);
+	Repository &operator=(const Repository &parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 1dd2704..7ca79a3 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -67,6 +67,11 @@ public:
 	{
 		return _contentFullPath;
 	}
+	//! Set the content full path
+	void setContentFullPath(std::string path)
+	{
+		_contentFullPath = path;
+	}
 
 protected:
 	//! A global unique identifier

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 94e33a1..6f5c08d 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -25,12 +25,18 @@ const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.f
 const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
 const char *Configure::nifi_graceful_shutdown_seconds  = "nifi.graceful.shutdown.seconds";
+const char *Configure::nifi_log_level = "nifi.log.level";
 const char *Configure::nifi_server_name = "nifi.server.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
 const char *Configure::nifi_server_report_interval= "nifi.server.report.interval";
 const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
 const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
 const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
+const char *Configure::nifi_provenance_repository_enable = "nifi.provenance.repository.enable";
+const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
+const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
+const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+const char *Configure::nifi_flowfile_repository_enable = "nifi.flowfile.repository.enable";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
 const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
 const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
@@ -38,7 +44,6 @@ const char *Configure::nifi_security_client_private_key = "nifi.security.client.
 const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
 const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
 
-
 //! Get the config value
 bool Configure::get(std::string key, std::string &value)
 {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 791fa63..42dbfe4 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -29,6 +29,8 @@
 
 #include "Connection.h"
 #include "Processor.h"
+#include "FlowFileRepository.h"
+#include "FlowController.h"
 
 Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
 : _name(name)
@@ -53,6 +55,10 @@ Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t des
 
 	logger_ = Logger::getLogger();
 
+	char uuidStr[37];
+	uuid_unparse_lower(_uuid, uuidStr);
+	_uuidStr = uuidStr;
+
 	logger_->log_info("Connection %s created", _name.c_str());
 }
 
@@ -93,6 +99,21 @@ void Connection::put(FlowFileRecord *flow)
 				flow->getUUIDStr().c_str(), _name.c_str());
 	}
 
+
+	if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+			FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() &&
+			!flow->isStoredToRepository())
+	{
+		// Save to the flowfile repo
+		FlowFileEventRecord event;
+		event.fromFlowFile(flow, this->_uuidStr);
+		if (event.Serialize(
+				FlowControllerFactory::getFlowController()->getFlowFileRepository()))
+		{
+			flow->setStoredToRepository(true);
+		}
+	}
+
 	// Notify receiving processor that work may be available
 	if(_destProcessor)
 	{
@@ -117,6 +138,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
 			{
 				// Flow record expired
 				expiredFlowRecords.insert(item);
+				if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+						FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+				{
+					// delete from the flowfile repo
+					FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+					item->setStoredToRepository(false);
+				}
 			}
 			else
 			{
@@ -131,6 +159,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
 				item->setOriginalConnection(this);
 				logger_->log_debug("Dequeue flow file UUID %s from connection %s",
 						item->getUUIDStr().c_str(), _name.c_str());
+				if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+						FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+				{
+					// delete from the flowfile repo
+					FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+					item->setStoredToRepository(false);
+				}
 				return item;
 			}
 		}
@@ -147,6 +182,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
 			item->setOriginalConnection(this);
 			logger_->log_debug("Dequeue flow file UUID %s from connection %s",
 					item->getUUIDStr().c_str(), _name.c_str());
+			if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+					FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+			{
+				// delete from the flowfile repo
+				FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+				item->setStoredToRepository(false);
+			}
 			return item;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 4f5faf8..28e35b7 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -109,6 +109,8 @@ FlowControllerImpl::FlowControllerImpl(std::string name)  {
 
 
 	// Create repos for flow record and provenance
+	_flowfileRepo = new FlowFileRepository();
+	_flowfileRepo->initialize();
 	_provenanceRepo = new ProvenanceRepository();
 	_provenanceRepo->initialize();
 }
@@ -121,6 +123,8 @@ FlowControllerImpl::~FlowControllerImpl() {
 		delete _protocol;
 	if (NULL != _provenanceRepo)
 		delete _provenanceRepo;
+	if (NULL != _flowfileRepo)
+		delete _flowfileRepo;
 
 }
 
@@ -133,6 +137,8 @@ void FlowControllerImpl::stop(bool force) {
 		logger_->log_info("Stop Flow Controller");
 		this->_timerScheduler.stop();
 		this->_eventScheduler.stop();
+		this->_flowfileRepo->stop();
+		this->_provenanceRepo->stop();
 		// Wait for sometime for thread stop
 		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 		if (this->_root)
@@ -238,14 +244,12 @@ void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
 	uuid_t uuid;
 	ProcessGroup *group = NULL;
 
-	// generate the random UIID
-	uuid_generate(uuid);
-
 	std::string flowName = rootFlowNode["name"].as<std::string>();
+	std::string id = rootFlowNode["id"].as<std::string>();
+
+	uuid_parse(id.c_str(), uuid);
 
-	char uuidStr[37];
-	uuid_unparse_lower(_uuid, uuidStr);
-	logger_->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+	logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
 	logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
 	group = this->createRootProcessGroup(flowName, uuid);
 	this->_root = group;
@@ -278,17 +282,14 @@ void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
 				YAML::Node procNode = iter->as<YAML::Node>();
 
 				procCfg.name = procNode["name"].as<std::string>();
-				logger_->log_debug("parseProcessorNode: name => [%s]",
-						procCfg.name.c_str());
+				procCfg.id = procNode["id"].as<std::string>();
+				logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
+						procCfg.name.c_str(), procCfg.id.c_str());
 				procCfg.javaClass = procNode["class"].as<std::string>();
 				logger_->log_debug("parseProcessorNode: class => [%s]",
 						procCfg.javaClass.c_str());
 
-				char uuidStr[37];
-				uuid_unparse_lower(_uuid, uuidStr);
-
-				// generate the random UUID
-				uuid_generate(uuid);
+				uuid_parse(procCfg.id.c_str(), uuid);
 
 				// Determine the processor name only from the Java class
 				int lastOfIdx = procCfg.javaClass.find_last_of(".");
@@ -303,7 +304,7 @@ void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
 				if (!processor) {
 					logger_->log_error(
 							"Could not create a processor %s with name %s",
-							procCfg.name.c_str(), uuidStr);
+							procCfg.name.c_str(), procCfg.id.c_str());
 					throw std::invalid_argument(
 							"Could not create processor " + procCfg.name);
 				}
@@ -468,8 +469,10 @@ void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
 				YAML::Node rpgNode = iter->as<YAML::Node>();
 
 				auto name = rpgNode["name"].as<std::string>();
-				logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s]",
-						name.c_str());
+				auto id = rpgNode["id"].as<std::string>();
+
+				logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
+						name.c_str(), id.c_str());
 
 				std::string url = rpgNode["url"].as<std::string>();
 				logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
@@ -491,11 +494,7 @@ void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
 						rpgNode["Output Ports"].as<YAML::Node>();
 				ProcessGroup *group = NULL;
 
-				// generate the random UUID
-				uuid_generate(uuid);
-
-				char uuidStr[37];
-				uuid_unparse_lower(_uuid, uuidStr);
+				uuid_parse(id.c_str(), uuid);
 
 				int64_t timeoutValue = -1;
 				int64_t yieldPeriodValue = -1;
@@ -568,20 +567,18 @@ void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
 		if (connectionsNode->IsSequence()) {
 			for (YAML::const_iterator iter = connectionsNode->begin();
 					iter != connectionsNode->end(); ++iter) {
-				// generate the random UUID
-				uuid_generate(uuid);
 
 				YAML::Node connectionNode = iter->as<YAML::Node>();
 
 				std::string name = connectionNode["name"].as<std::string>();
-				std::string destName = connectionNode["destination name"].as<
+				std::string id = connectionNode["id"].as<std::string>();
+				std::string destId = connectionNode["destination id"].as<
 						std::string>();
 
-				char uuidStr[37];
-				uuid_unparse_lower(_uuid, uuidStr);
+				uuid_parse(id.c_str(), uuid);
 
 				logger_->log_debug(
-						"Created connection with UUID %s and name %s", uuidStr,
+						"Created connection with UUID %s and name %s", id.c_str(),
 						name.c_str());
 				connection = this->createConnection(name, uuid);
 				auto rawRelationship =
@@ -592,26 +589,30 @@ void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
 						rawRelationship.c_str());
 				if (connection)
 					connection->setRelationship(relationship);
-				std::string connectionSrcProcName =
-						connectionNode["source name"].as<std::string>();
+				std::string connectionSrcProcId =
+						connectionNode["source id"].as<std::string>();
+				uuid_t srcUUID;
+				uuid_parse(connectionSrcProcId.c_str(), srcUUID);
 
 				Processor *srcProcessor = this->_root->findProcessor(
-						connectionSrcProcName);
+						srcUUID);
 
 				if (!srcProcessor) {
 					logger_->log_error(
-							"Could not locate a source with name %s to create a connection",
-							connectionSrcProcName.c_str());
+							"Could not locate a source with id %s to create a connection",
+							connectionSrcProcId.c_str());
 					throw std::invalid_argument(
-							"Could not locate a source with name %s to create a connection "
-									+ connectionSrcProcName);
+							"Could not locate a source with id %s to create a connection "
+									+ connectionSrcProcId);
 				}
 
-				Processor *destProcessor = this->_root->findProcessor(destName);
+				uuid_t destUUID;
+				uuid_parse(destId.c_str(), destUUID);
+				Processor *destProcessor = this->_root->findProcessor(destUUID);
 				// If we could not find name, try by UUID
 				if (!destProcessor) {
 					uuid_t destUuid;
-					uuid_parse(destName.c_str(), destUuid);
+					uuid_parse(destId.c_str(), destUuid);
 					destProcessor = this->_root->findProcessor(destUuid);
 				}
 				if (destProcessor) {
@@ -728,11 +729,23 @@ void FlowControllerImpl::load() {
 		parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
 		parseConnectionYaml(&connectionsNode, this->_root);
 
-		_initialized = true;
+		// Load Flow File from Repo
+		loadFlowRepo();
 
+		_initialized = true;
     }
 }
 
+void FlowControllerImpl::loadFlowRepo()
+{
+	if (this->_flowfileRepo && this->_flowfileRepo->isEnable())
+	{
+		std::map<std::string, Connection *> connectionMap;
+		this->_root->getConnections(&connectionMap);
+		this->_flowfileRepo->loadFlowFileToConnections(&connectionMap);
+	}
+}
+
 void FlowControllerImpl::reload(std::string yamlFile)
 {
     logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
@@ -769,6 +782,8 @@ bool FlowControllerImpl::start() {
 						&this->_eventScheduler);
 			_running = true;
 			this->_protocol->start();
+			this->_provenanceRepo->start();
+			this->_flowfileRepo->start();
 			logger_->log_info("Started Flow Controller");
 		}
 		return true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 362602e..a2f2323 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -27,9 +27,10 @@
 #include <cstdio>
 
 #include "FlowFileRecord.h"
-
-#include "Logger.h"
 #include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "FlowFileRepository.h"
 
 std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
 
@@ -39,6 +40,7 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
   _offset(0),
   _penaltyExpirationMs(0),
   _claim(claim),
+  _isStoredToRepo(false),
   _markedDelete(false),
   _connection(NULL),
   _orginalConnection(NULL)
@@ -74,6 +76,43 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
 	logger_ = Logger::getLogger();
 }
 
+FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
+: _size(0),
+  _id(_localFlowSeqNumber.load()),
+  _offset(0),
+  _penaltyExpirationMs(0),
+  _claim(NULL),
+  _isStoredToRepo(false),
+  _markedDelete(false),
+  _connection(NULL),
+  _orginalConnection(NULL)
+{
+	_entryDate = event->getFlowFileEntryDate();
+	_lineageStartDate = event->getlineageStartDate();
+	_size = event->getFileSize();
+	_offset = event->getFileOffset();
+	_lineageIdentifiers = event->getLineageIdentifiers();
+	_attributes = event->getAttributes();
+    _snapshot = false;
+    _uuidStr = event->getFlowFileUuid();
+    uuid_parse(_uuidStr.c_str(), _uuid);
+
+    if (_size > 0)
+    {
+    	_claim = new ResourceClaim();
+    }
+
+	if (_claim)
+	{
+		_claim->setContentFullPath(event->getContentFullPath());
+		// Increase the flow file record owned count for the resource claim
+		_claim->increaseFlowFileRecordOwnedCount();
+	}
+	logger_ = Logger::getLogger();
+	++_localFlowSeqNumber;
+}
+
+
 FlowFileRecord::~FlowFileRecord()
 {
 	if (!_snapshot)
@@ -87,7 +126,15 @@ FlowFileRecord::~FlowFileRecord()
 		if (_claim->getFlowFileRecordOwnedCount() <= 0)
 		{
 			logger_->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
-			std::remove(_claim->getContentFullPath().c_str());
+			std::string value;
+			if (!FlowControllerFactory::getFlowController()->getFlowFileRepository() ||
+					!FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() ||
+					!this->_isStoredToRepo ||
+					!FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr, value))
+			{
+				// if it is persistent to DB already while it is in the queue, we keep the content
+				std::remove(_claim->getContentFullPath().c_str());
+			}
 			delete _claim;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRepository.cpp b/libminifi/src/FlowFileRepository.cpp
new file mode 100644
index 0000000..ade8dce
--- /dev/null
+++ b/libminifi/src/FlowFileRepository.cpp
@@ -0,0 +1,280 @@
+/**
+ * @file FlowFileRepository.cpp
+ * FlowFile implemenatation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "FlowFileRepository.h"
+
+//! DeSerialize
+bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo,
+		std::string key) {
+	std::string value;
+	bool ret;
+
+	ret = repo->Get(key, value);
+
+	if (!ret) {
+		logger_->log_error("NiFi FlowFile Store event %s can not found",
+				key.c_str());
+		return false;
+	} else
+		logger_->log_debug("NiFi FlowFile Read event %s length %d",
+				key.c_str(), value.length());
+
+
+	DataStream stream((const uint8_t*)value.data(),value.length());
+
+	ret = DeSerialize(stream);
+
+	if (ret) {
+		logger_->log_debug(
+				"NiFi FlowFile retrieve uuid %s size %d connection %s success",
+				_uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
+	} else {
+		logger_->log_debug(
+				"NiFi FlowFile retrieve uuid %s size %d connection %d fail",
+				_uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
+	}
+
+	return ret;
+}
+
+bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) {
+
+	DataStream outStream;
+
+	int ret;
+
+	ret = write(this->_eventTime,&outStream);
+	if (ret != 8) {
+
+		return false;
+	}
+
+	ret = write(this->_entryDate,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	ret = write(this->_lineageStartDate,&outStream);
+	if (ret != 8) {
+
+		return false;
+	}
+
+	ret = writeUTF(this->_uuid,&outStream);
+	if (ret <= 0) {
+
+		return false;
+	}
+
+	ret = writeUTF(this->_uuidConnection,&outStream);
+	if (ret <= 0) {
+
+		return false;
+	}
+
+	// write flow attributes
+	uint32_t numAttributes = this->_attributes.size();
+	ret = write(numAttributes,&outStream);
+	if (ret != 4) {
+
+		return false;
+	}
+
+	for (auto itAttribute : _attributes) {
+		ret = writeUTF(itAttribute.first,&outStream, true);
+		if (ret <= 0) {
+
+			return false;
+		}
+		ret = writeUTF(itAttribute.second,&outStream, true);
+		if (ret <= 0) {
+
+			return false;
+		}
+	}
+
+	ret = writeUTF(this->_contentFullPath,&outStream);
+	if (ret <= 0) {
+
+		return false;
+	}
+
+	ret = write(this->_size,&outStream);
+	if (ret != 8) {
+
+		return false;
+	}
+
+	ret = write(this->_offset,&outStream);
+	if (ret != 8) {
+
+		return false;
+	}
+
+	// Persistent to the DB
+
+	if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+		logger_->log_debug("NiFi FlowFile Store event %s size %d success",
+				_uuid.c_str(), outStream.getSize());
+		return true;
+	} else {
+		logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+				_uuid.c_str(), outStream.getSize());
+		return false;
+	}
+
+	// cleanup
+
+	return true;
+}
+
+bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+
+	int ret;
+
+	DataStream outStream(buffer,bufferSize);
+
+	ret = read(this->_eventTime,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	ret = read(this->_entryDate,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	ret = read(this->_lineageStartDate,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	ret = readUTF(this->_uuid,&outStream);
+	if (ret <= 0) {
+		return false;
+	}
+
+	ret = readUTF(this->_uuidConnection,&outStream);
+	if (ret <= 0) {
+		return false;
+	}
+
+	// read flow attributes
+	uint32_t numAttributes = 0;
+	ret = read(numAttributes,&outStream);
+	if (ret != 4) {
+		return false;
+	}
+
+	for (uint32_t i = 0; i < numAttributes; i++) {
+		std::string key;
+		ret = readUTF(key,&outStream, true);
+		if (ret <= 0) {
+			return false;
+		}
+		std::string value;
+		ret = readUTF(value,&outStream, true);
+		if (ret <= 0) {
+			return false;
+		}
+		this->_attributes[key] = value;
+	}
+
+	ret = readUTF(this->_contentFullPath,&outStream);
+	if (ret <= 0) {
+		return false;
+	}
+
+	ret = read(this->_size,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	ret = read(this->_offset,&outStream);
+	if (ret != 8) {
+		return false;
+	}
+
+	return true;
+}
+
+void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap)
+{
+	if (!_enable)
+		return;
+
+	std::vector<std::string> purgeList;
+	leveldb::Iterator* it = _db->NewIterator(
+						leveldb::ReadOptions());
+
+	for (it->SeekToFirst(); it->Valid(); it->Next())
+	{
+		FlowFileEventRecord eventRead;
+		std::string key = it->key().ToString();
+		if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+				(int) it->value().size()))
+		{
+			auto search = connectionMap->find(eventRead.getConnectionUuid());
+			if (search != connectionMap->end())
+			{
+				// we find the connection for the persistent flowfile, create the flowfile and enqueue that
+				FlowFileRecord *record = new FlowFileRecord(&eventRead);
+				// set store to repo to true so that we do need to persistent again in enqueue
+				record->setStoredToRepository(true);
+				search->second->put(record);
+			}
+			else
+			{
+				if (eventRead.getContentFullPath().length() > 0)
+				{
+					std::remove(eventRead.getContentFullPath().c_str());
+				}
+				purgeList.push_back(key);
+			}
+		}
+		else
+		{
+			purgeList.push_back(key);
+		}
+	}
+
+	delete it;
+	std::vector<std::string>::iterator itPurge;
+	for (itPurge = purgeList.begin(); itPurge != purgeList.end();
+						itPurge++)
+	{
+		std::string eventId = *itPurge;
+		logger_->log_info("Repository Repo %s Purge %s",
+										RepositoryTypeStr[_type],
+										eventId.c_str());
+		Delete(eventId);
+	}
+
+	return;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index f451838..7e3527e 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -66,6 +66,18 @@ ProcessGroup::~ProcessGroup() {
 	}
 }
 
+void ProcessGroup::getConnections(std::map<std::string, Connection*> *connectionMap)
+{
+	for (auto connection : connections_)
+	{
+		(*connectionMap)[connection->getUUIDStr()] = connection;
+	}
+
+	for (auto processGroup: child_process_groups_) {
+		processGroup->getConnections(connectionMap);
+	}
+}
+
 bool ProcessGroup::isRootProcessGroup() {
 	std::lock_guard<std::mutex> lock(mtx_);
 	return (type_ == ROOT_PROCESS_GROUP);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
index c67abcc..3b3eb64 100644
--- a/libminifi/src/ProcessSession.cpp
+++ b/libminifi/src/ProcessSession.cpp
@@ -28,6 +28,18 @@
 #include <iostream>
 
 #include "ProcessSession.h"
+#include "FlowController.h"
+
+ProcessSession::ProcessSession(ProcessContext *processContext) : _processContext(processContext) {
+	logger_ = Logger::getLogger();
+	logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
+	_provenanceReport = NULL;
+	if (FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
+	{
+		_provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
+					_processContext->getProcessor()->getName());
+	}
+}
 
 FlowFileRecord* ProcessSession::create()
 {
@@ -39,7 +51,8 @@ FlowFileRecord* ProcessSession::create()
 		_addedFlowFiles[record->getUUIDStr()] = record;
 		logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
 		std::string details = _processContext->getProcessor()->getName() + " creates flow record " +  record->getUUIDStr();
-		_provenanceReport->create(record, details);
+		if (_provenanceReport)
+			_provenanceReport->create(record, details);
 	}
 
 	return record;
@@ -91,7 +104,8 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
 			record->_size = parent->_size;
 			record->_claim->increaseFlowFileRecordOwnedCount();
 		}
-		_provenanceReport->clone(parent, record);
+		if (_provenanceReport)
+			_provenanceReport->clone(parent, record);
 	}
 	return record;
 }
@@ -129,7 +143,8 @@ FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
 	    	record->_size = parent->_size;
 	    	record->_claim->increaseFlowFileRecordOwnedCount();
 	    }
-	    _provenanceReport->clone(parent, record);
+	    if (_provenanceReport)
+	    	_provenanceReport->clone(parent, record);
 	}
 
 	return record;
@@ -161,7 +176,8 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long
 			record->_claim = parent->_claim;
 			record->_claim->increaseFlowFileRecordOwnedCount();
 		}
-		_provenanceReport->clone(parent, record);
+		if (_provenanceReport)
+			_provenanceReport->clone(parent, record);
 	}
 	return record;
 }
@@ -171,7 +187,8 @@ void ProcessSession::remove(FlowFileRecord *flow)
 	flow->_markedDelete = true;
 	_deletedFlowFiles[flow->getUUIDStr()] = flow;
 	std::string reason = _processContext->getProcessor()->getName() + " drop flow record " +  flow->getUUIDStr();
-	_provenanceReport->drop(flow, reason);
+	if (_provenanceReport)
+		_provenanceReport->drop(flow, reason);
 }
 
 void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
@@ -179,7 +196,8 @@ void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::st
 	flow->setAttribute(key, value);
 	std::string details = _processContext->getProcessor()->getName() + " modify flow record " +  flow->getUUIDStr() +
 			" attribute " + key + ":" + value;
-	_provenanceReport->modifyAttributes(flow, details);
+	if (_provenanceReport)
+		_provenanceReport->modifyAttributes(flow, details);
 }
 
 void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
@@ -187,7 +205,8 @@ void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
 	flow->removeAttribute(key);
 	std::string details = _processContext->getProcessor()->getName() + " remove flow record " +  flow->getUUIDStr() +
 				" attribute " + key;
-	_provenanceReport->modifyAttributes(flow, details);
+	if (_provenanceReport)
+		_provenanceReport->modifyAttributes(flow, details);
 }
 
 void ProcessSession::penalize(FlowFileRecord *flow)
@@ -233,7 +252,8 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
 				fs.close();
 				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
 				uint64_t endTime = getTimeMillis();
-				_provenanceReport->modifyContent(flow, details, endTime - startTime);
+				if (_provenanceReport)
+					_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -304,7 +324,8 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
 				fs.close();
 				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
 				uint64_t endTime = getTimeMillis();
-				_provenanceReport->modifyContent(flow, details, endTime - startTime);
+				if (_provenanceReport)
+					_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -430,7 +451,8 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS
 					std::remove(source.c_str());
 				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
 				uint64_t endTime = getTimeMillis();
-				_provenanceReport->modifyContent(flow, details, endTime - startTime);
+				if (_provenanceReport)
+					_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -653,7 +675,8 @@ void ProcessSession::commit()
 		_deletedFlowFiles.clear();
 		_originalFlowFiles.clear();
 		// persistent the provenance report
-		this->_provenanceReport->commit();
+		if (this->_provenanceReport)
+			this->_provenanceReport->commit();
 		logger_->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
 	}
 	catch (std::exception &exception)
@@ -740,7 +763,8 @@ FlowFileRecord *ProcessSession::get()
 			{
 				FlowFileRecord *record = *it;
 				std::string details = _processContext->getProcessor()->getName() + " expire flow record " +  record->getUUIDStr();
-				_provenanceReport->expire(record, details);
+				if (_provenanceReport)
+					_provenanceReport->expire(record, details);
 				delete (record);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
index ff4d8f5..58cf730 100644
--- a/libminifi/src/Provenance.cpp
+++ b/libminifi/src/Provenance.cpp
@@ -23,9 +23,8 @@
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "Provenance.h"
-
-#include "Logger.h"
 #include "Relationship.h"
+#include "Logger.h"
 #include "FlowController.h"
 
 //! DeSerialize
@@ -227,9 +226,11 @@ bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) {
 	if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
 		logger_->log_debug("NiFi Provenance Store event %s size %d success",
 				_eventIdStr.c_str(), outStream.getSize());
+		return true;
 	} else {
 		logger_->log_error("NiFi Provenance Store event %s size %d fail",
 				_eventIdStr.c_str(), outStream.getSize());
+		return false;
 	}
 
 	// cleanup
@@ -391,6 +392,8 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferS
 }
 
 void ProvenanceReporter::commit() {
+	if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
+		return;
 	for (auto event : _events) {
 		if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) {
 			event->Serialize(
@@ -561,68 +564,3 @@ void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri,
 	}
 }
 
-uint64_t ProvenanceRepository::_repoSize = 0;
-
-void ProvenanceRepository::start() {
-	if (this->_purgePeriod <= 0)
-		return;
-	if (_running)
-		return;
-	_running = true;
-	logger_->log_info("ProvenanceRepository Monitor Thread Start");
-	_thread = new std::thread(run, this);
-	_thread->detach();
-}
-
-void ProvenanceRepository::stop() {
-	if (!_running)
-		return;
-	_running = false;
-	logger_->log_info("ProvenanceRepository Monitor Thread Stop");
-}
-
-void ProvenanceRepository::run(ProvenanceRepository *repo) {
-	// threshold for purge
-	uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
-	while (repo->_running) {
-		std::this_thread::sleep_for(
-				std::chrono::milliseconds(repo->_purgePeriod));
-		uint64_t curTime = getTimeMillis();
-		uint64_t size = repo->repoSize();
-		if (size >= purgeThreshold) {
-			std::vector<std::string> purgeList;
-			leveldb::Iterator* it = repo->_db->NewIterator(
-					leveldb::ReadOptions());
-			for (it->SeekToFirst(); it->Valid(); it->Next()) {
-				ProvenanceEventRecord eventRead;
-				std::string key = it->key().ToString();
-				if (eventRead.DeSerialize((uint8_t *) it->value().data(),
-						(int) it->value().size())) {
-					if ((curTime - eventRead.getEventTime())
-							> repo->_maxPartitionMillis)
-						purgeList.push_back(key);
-				} else {
-					repo->logger_->log_debug(
-							"NiFi Provenance retrieve event %s fail",
-							key.c_str());
-					purgeList.push_back(key);
-				}
-			}
-			delete it;
-			std::vector<std::string>::iterator itPurge;
-			for (itPurge = purgeList.begin(); itPurge != purgeList.end();
-					itPurge++) {
-				std::string eventId = *itPurge;
-				repo->logger_->log_info("ProvenanceRepository Repo Purge %s",
-						eventId.c_str());
-				repo->Delete(eventId);
-			}
-		}
-		if (size > repo->_maxPartitionBytes)
-			repo->_repoFull = true;
-		else
-			repo->_repoFull = false;
-	}
-	return;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Repository.cpp b/libminifi/src/Repository.cpp
new file mode 100644
index 0000000..ffbd953
--- /dev/null
+++ b/libminifi/src/Repository.cpp
@@ -0,0 +1,138 @@
+/**
+ * @file Repository.cpp
+ * Repository implemenatation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "Repository.h"
+#include "Provenance.h"
+#include "FlowFileRepository.h"
+
+const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
+uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
+
+void Repository::start() {
+	if (!_enable)
+		return;
+	if (this->_purgePeriod <= 0)
+		return;
+	if (_running)
+		return;
+	_running = true;
+	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
+	_thread = new std::thread(run, this);
+	_thread->detach();
+}
+
+void Repository::stop() {
+	if (!_running)
+		return;
+	_running = false;
+	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
+}
+
+void Repository::run(Repository *repo) {
+	// threshold for purge
+	uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
+	while (repo->_running) {
+		std::this_thread::sleep_for(
+				std::chrono::milliseconds(repo->_purgePeriod));
+		uint64_t curTime = getTimeMillis();
+		uint64_t size = repo->repoSize();
+		if (size >= purgeThreshold) {
+			std::vector<std::string> purgeList;
+			leveldb::Iterator* it = repo->_db->NewIterator(
+					leveldb::ReadOptions());
+			if (repo->_type == PROVENANCE)
+			{
+				for (it->SeekToFirst(); it->Valid(); it->Next()) {
+					ProvenanceEventRecord eventRead;
+					std::string key = it->key().ToString();
+					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+						(int) it->value().size())) {
+						if ((curTime - eventRead.getEventTime())
+							> repo->_maxPartitionMillis)
+							purgeList.push_back(key);
+					} else {
+						repo->logger_->log_debug(
+							"NiFi %s retrieve event %s fail",
+							RepositoryTypeStr[repo->_type],
+							key.c_str());
+						purgeList.push_back(key);
+					}
+				}
+			}
+			if (repo->_type == FLOWFILE)
+			{
+				for (it->SeekToFirst(); it->Valid(); it->Next()) {
+					FlowFileEventRecord eventRead;
+					std::string key = it->key().ToString();
+					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+						(int) it->value().size())) {
+						if ((curTime - eventRead.getEventTime())
+							> repo->_maxPartitionMillis)
+							purgeList.push_back(key);
+					} else {
+						repo->logger_->log_debug(
+							"NiFi %s retrieve event %s fail",
+							RepositoryTypeStr[repo->_type],
+							key.c_str());
+						purgeList.push_back(key);
+					}
+				}
+			}
+			delete it;
+			for (auto eventId : purgeList)
+			{
+				repo->logger_->log_info("Repository Repo %s Purge %s",
+						RepositoryTypeStr[repo->_type],
+						eventId.c_str());
+				repo->Delete(eventId);
+			}
+		}
+		if (size > repo->_maxPartitionBytes)
+			repo->_repoFull = true;
+		else
+			repo->_repoFull = false;
+	}
+	return;
+}
+
+//! repoSize
+uint64_t Repository::repoSize()
+{
+	_repoSize[_type] = 0;
+	if (_type == PROVENANCE)
+        {
+		if (ftw(_directory.c_str(), repoSumProvenance, 1) != 0)
+			_repoSize[_type] = 0;
+	}
+	if (_type == FLOWFILE)
+        {
+		if (ftw(_directory.c_str(), repoSumFlowFile, 1) != 0)
+			_repoSize[_type] = 0;
+	}
+	return _repoSize[_type];
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index ce1f248..be52b49 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -45,5 +45,5 @@ ResourceClaim::ResourceClaim(const std::string contentDirectory)
 
 	configure_ = Configure::getConfigure();
 	logger_ = Logger::getLogger();
-	logger_->log_debug("Resource Claim created %s", _contentFullPath.c_str());
+	logger_->log_debug("Resource Claim created %s", uuidStr);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index ebc2e44..39b71b4 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -331,7 +331,7 @@ int Socket::writeData(uint8_t *value, int size) {
 	}
 
 	if (ret)
-		logger_->log_debug("Send data size %d over socket %d", size,
+		logger_->log_trace("Send data size %d over socket %d", size,
 				socket_file_descriptor_);
 
 	return bytes;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 955d125..ae5f7e5 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -20,6 +20,8 @@
 #include "FlowController.h"
 #include "ProvenanceTestHelper.h"
 #include "../TestBase.h"
+#include <memory>
+#include "../../include/LogAppenders.h"
 #include "GetFile.h"
 
 
@@ -33,10 +35,18 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 
 	TestController testController;
 
-	testController.enableDebug();
+	Configure *config = Configure::getConfigure();
 
-	ProvenanceTestRepository repo;
-	TestFlowController controller(repo);
+	config->set(BaseLogger::nifi_log_appender,"rollingappender");
+	config->set(OutputStreamAppender::nifi_log_output_stream_error_stderr,"true");
+	std::shared_ptr<Logger> logger = Logger::getLogger();
+	std::unique_ptr<BaseLogger> newLogger =LogInstance::getConfiguredLogger(config);
+	logger->updateLogger(std::move(newLogger));
+	logger->setLogLevel("debug");
+
+	ProvenanceTestRepository provenanceRepo;
+	FlowTestRepository flowRepo;
+	TestFlowController controller(provenanceRepo, flowRepo);
 	FlowControllerFactory::getFlowController( dynamic_cast<FlowController*>(&controller));
 
 	GetFile processor("getfileCreate2");
@@ -54,7 +64,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 	// link the connections so that we can test results at the end for this
 
 	connection.setSourceProcessor(&processor);
-
+	connection.setDestinationProcessor(&processor);
 
 	connection.setSourceProcessorUUID(processoruuid);
 	connection.setDestinationProcessorUUID(processoruuid);
@@ -66,7 +76,6 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 	context.setProperty(GetFile::Directory,dir);
 	ProcessSession session(&context);
 
-
 	REQUIRE( processor.getName() == "getfileCreate2");
 
 	FlowFileRecord *record;
@@ -82,13 +91,16 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 
 	std::fstream file;
 	std::stringstream ss;
-	ss << dir << "/" << "tstFile.ext";
+	std::string fileName("tstFile.ext");
+	ss << dir << "/" << fileName;
 	file.open(ss.str(),std::ios::out);
 	file << "tempFile";
+	int64_t fileSize = file.tellp();
 	file.close();
 
 	processor.incrementActiveTasks();
 	processor.setScheduledState(ScheduledState::RUNNING);
+
 	processor.onTrigger(&context,&session);
 	unlink(ss.str().c_str());
 	rmdir(dir);
@@ -103,6 +115,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 	}
 	session.commit();
 
+        // verify flow file repo
+	REQUIRE( 1 == flowRepo.getRepoMap().size() );
+
+	for(auto  entry: flowRepo.getRepoMap())
+	{
+		FlowFileEventRecord newRecord;
+		newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length());
+		REQUIRE (fileSize == newRecord.getFileSize());
+		REQUIRE (0 == newRecord.getFileOffset());
+		std::map<std::string, std::string> attrs = newRecord.getAttributes();
+		std::string key = FlowAttributeKey(FILENAME);
+		REQUIRE (attrs[key] == fileName);
+	}
+
 	FlowFileRecord *ffr = session.get();
 
 	ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -111,9 +137,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 
 	std::set<FlowFileRecord*> expiredFlows;
 
-	REQUIRE( 2 == repo.getRepoMap().size() );
+	REQUIRE( 2 == provenanceRepo.getRepoMap().size() );
 
-	for(auto  entry: repo.getRepoMap())
+	for(auto  entry: provenanceRepo.getRepoMap())
 	{
 		ProvenanceEventRecord newRecord;
 		newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length());
@@ -134,9 +160,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 		}
 		if (!found)
 		throw std::runtime_error("Did not find record");
+	}
 
 
-	}
 
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index f67a826..1ee6a4c 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -20,6 +20,62 @@
 
 #include "Provenance.h"
 #include "FlowController.h"
+#include "FlowFileRepository.h"
+
+/**
+ * Test repository
+ */
+class FlowTestRepository : public FlowFileRepository
+{
+public:
+	FlowTestRepository()
+{
+}
+		//! initialize
+		bool initialize()
+		{
+			return true;
+		}
+
+		//! Destructor
+		virtual ~FlowTestRepository() {
+
+		}
+
+		bool Put(std::string key, uint8_t *buf, int bufLen)
+		{
+			repositoryResults.insert(std::pair<std::string,std::string>(key,std::string((const char*)buf,bufLen)));
+			return true;
+		}
+		//! Delete
+		bool Delete(std::string key)
+		{
+			repositoryResults.erase(key);
+			return true;
+		}
+		//! Get
+		bool Get(std::string key, std::string &value)
+		{
+			auto result = repositoryResults.find(key);
+			if (result != repositoryResults.end())
+			{
+				value = result->second;
+				return true;
+			}
+			else
+			{
+				return false;
+			}
+		}
+
+		const std::map<std::string,std::string> &getRepoMap() const
+		{
+			return repositoryResults;
+		}
+
+protected:
+		std::map<std::string,std::string> repositoryResults;
+};
 
 /**
  * Test repository
@@ -81,9 +137,10 @@ class TestFlowController : public FlowController
 {
 
 public:
-	TestFlowController(ProvenanceTestRepository &repo) : ::FlowController()
+	TestFlowController(ProvenanceTestRepository &provenanceRepo, FlowTestRepository &flowRepo) : ::FlowController()
 	{
-		_provenanceRepo = dynamic_cast<ProvenanceRepository*>(&repo);
+		_provenanceRepo = dynamic_cast<ProvenanceRepository*>(&provenanceRepo);
+		_flowfileRepo = dynamic_cast<FlowFileRepository*>(&flowRepo);
 	}
 	~TestFlowController()
 	{