You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/11/15 10:12:46 UTC

[14/14] nifi-minifi-cpp git commit: MINIFI-131: Establish framework Provenance Support

MINIFI-131:  Establish framework Provenance Support

MINIFI-113:  Correct GetFile FileFilter regex bug

This closes #23.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/dc9544f8
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/dc9544f8
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/dc9544f8

Branch: refs/heads/master
Commit: dc9544f8cb5b0f2f8c616ea63181fc6708bcc8b4
Parents: 05838e5
Author: Bin Qiu <be...@gmail.com>
Authored: Fri Oct 28 09:48:57 2016 -0700
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Nov 15 05:12:02 2016 -0500

----------------------------------------------------------------------
 .travis.yml                                     |    1 +
 LICENSE                                         |   30 +
 README.md                                       |    3 +-
 conf/minifi.properties                          |    4 +
 libminifi/include/Configure.h                   |    3 +
 libminifi/include/FlowControlProtocol.h         |    2 +
 libminifi/include/FlowController.h              |   35 +-
 libminifi/include/FlowFileRecord.h              |    9 +
 libminifi/include/GetFile.h                     |    2 +-
 libminifi/include/ProcessSession.h              |   15 +-
 libminifi/include/Provenance.h                  |  902 ++++++++
 libminifi/src/Configure.cpp                     |    4 +-
 libminifi/src/FlowController.cpp                |    4 +
 libminifi/src/GetFile.cpp                       |   14 +-
 libminifi/src/ProcessSession.cpp                |   42 +-
 libminifi/src/Provenance.cpp                    |  919 ++++++++
 libminifi/src/Site2SiteClientProtocol.cpp       |   13 +
 libminifi/src/Site2SitePeer.cpp                 |   10 +-
 main/CMakeLists.txt                             |    4 +-
 main/MiNiFiMain.cpp                             |    2 +-
 thirdparty/leveldb-1.18/.gitignore              |   33 +
 thirdparty/leveldb-1.18/AUTHORS                 |   12 +
 thirdparty/leveldb-1.18/CMakeLists.txt          |  256 +++
 thirdparty/leveldb-1.18/LICENSE                 |   27 +
 thirdparty/leveldb-1.18/NEWS                    |   17 +
 thirdparty/leveldb-1.18/README                  |   51 +
 thirdparty/leveldb-1.18/README.md               |   20 +
 thirdparty/leveldb-1.18/TODO                    |   14 +
 thirdparty/leveldb-1.18/WINDOWS                 |   42 +
 thirdparty/leveldb-1.18/build_detect_platform   |  228 ++
 .../leveldb-1.18/build_detect_platform.cmake    |    7 +
 thirdparty/leveldb-1.18/db/autocompact_test.cc  |  118 +
 thirdparty/leveldb-1.18/db/builder.cc           |   88 +
 thirdparty/leveldb-1.18/db/builder.h            |   34 +
 thirdparty/leveldb-1.18/db/c.cc                 |  595 +++++
 thirdparty/leveldb-1.18/db/c_test.c             |  390 ++++
 thirdparty/leveldb-1.18/db/corruption_test.cc   |  388 ++++
 thirdparty/leveldb-1.18/db/db_bench.cc          |  979 ++++++++
 thirdparty/leveldb-1.18/db/db_impl.cc           | 1529 +++++++++++++
 thirdparty/leveldb-1.18/db/db_impl.h            |  211 ++
 thirdparty/leveldb-1.18/db/db_iter.cc           |  317 +++
 thirdparty/leveldb-1.18/db/db_iter.h            |   28 +
 thirdparty/leveldb-1.18/db/db_test.cc           | 2129 ++++++++++++++++++
 thirdparty/leveldb-1.18/db/dbformat.cc          |  140 ++
 thirdparty/leveldb-1.18/db/dbformat.h           |  230 ++
 thirdparty/leveldb-1.18/db/dbformat_test.cc     |  112 +
 thirdparty/leveldb-1.18/db/dumpfile.cc          |  225 ++
 thirdparty/leveldb-1.18/db/filename.cc          |  144 ++
 thirdparty/leveldb-1.18/db/filename.h           |   85 +
 thirdparty/leveldb-1.18/db/filename_test.cc     |  123 +
 thirdparty/leveldb-1.18/db/leveldb_main.cc      |   64 +
 thirdparty/leveldb-1.18/db/log_format.h         |   35 +
 thirdparty/leveldb-1.18/db/log_reader.cc        |  266 +++
 thirdparty/leveldb-1.18/db/log_reader.h         |  108 +
 thirdparty/leveldb-1.18/db/log_test.cc          |  530 +++++
 thirdparty/leveldb-1.18/db/log_writer.cc        |  103 +
 thirdparty/leveldb-1.18/db/log_writer.h         |   48 +
 thirdparty/leveldb-1.18/db/memtable.cc          |  145 ++
 thirdparty/leveldb-1.18/db/memtable.h           |   91 +
 thirdparty/leveldb-1.18/db/repair.cc            |  461 ++++
 thirdparty/leveldb-1.18/db/skiplist.h           |  384 ++++
 thirdparty/leveldb-1.18/db/skiplist_test.cc     |  378 ++++
 thirdparty/leveldb-1.18/db/snapshot.h           |   66 +
 thirdparty/leveldb-1.18/db/table_cache.cc       |  127 ++
 thirdparty/leveldb-1.18/db/table_cache.h        |   61 +
 thirdparty/leveldb-1.18/db/version_edit.cc      |  266 +++
 thirdparty/leveldb-1.18/db/version_edit.h       |  107 +
 thirdparty/leveldb-1.18/db/version_edit_test.cc |   46 +
 thirdparty/leveldb-1.18/db/version_set.cc       | 1484 ++++++++++++
 thirdparty/leveldb-1.18/db/version_set.h        |  396 ++++
 thirdparty/leveldb-1.18/db/version_set_test.cc  |  179 ++
 thirdparty/leveldb-1.18/db/write_batch.cc       |  147 ++
 .../leveldb-1.18/db/write_batch_internal.h      |   49 +
 thirdparty/leveldb-1.18/db/write_batch_test.cc  |  125 +
 .../leveldb-1.18/doc/bench/db_bench_sqlite3.cc  |  718 ++++++
 .../leveldb-1.18/doc/bench/db_bench_tree_db.cc  |  528 +++++
 thirdparty/leveldb-1.18/doc/benchmark.html      |  459 ++++
 thirdparty/leveldb-1.18/doc/doc.css             |   89 +
 thirdparty/leveldb-1.18/doc/impl.html           |  213 ++
 thirdparty/leveldb-1.18/doc/index.html          |  549 +++++
 thirdparty/leveldb-1.18/doc/log_format.txt      |   75 +
 thirdparty/leveldb-1.18/doc/table_format.txt    |  104 +
 .../leveldb-1.18/helpers/memenv/memenv.cc       |  385 ++++
 thirdparty/leveldb-1.18/helpers/memenv/memenv.h |   20 +
 .../leveldb-1.18/helpers/memenv/memenv_test.cc  |  232 ++
 thirdparty/leveldb-1.18/include/leveldb/c.h     |  290 +++
 thirdparty/leveldb-1.18/include/leveldb/cache.h |   99 +
 .../leveldb-1.18/include/leveldb/comparator.h   |   66 +
 thirdparty/leveldb-1.18/include/leveldb/db.h    |  161 ++
 .../leveldb-1.18/include/leveldb/dumpfile.h     |   25 +
 thirdparty/leveldb-1.18/include/leveldb/env.h   |  339 +++
 .../include/leveldb/filter_policy.h             |   70 +
 .../leveldb-1.18/include/leveldb/iterator.h     |  100 +
 .../leveldb-1.18/include/leveldb/options.h      |  195 ++
 thirdparty/leveldb-1.18/include/leveldb/slice.h |  109 +
 .../leveldb-1.18/include/leveldb/status.h       |  107 +
 thirdparty/leveldb-1.18/include/leveldb/table.h |   85 +
 .../include/leveldb/table_builder.h             |   92 +
 .../leveldb-1.18/include/leveldb/write_batch.h  |   64 +
 thirdparty/leveldb-1.18/issues/issue178_test.cc |   92 +
 thirdparty/leveldb-1.18/issues/issue200_test.cc |   59 +
 thirdparty/leveldb-1.18/port/README             |   10 +
 thirdparty/leveldb-1.18/port/atomic_pointer.h   |  223 ++
 thirdparty/leveldb-1.18/port/port.h             |   16 +
 thirdparty/leveldb-1.18/port/port_android.cc    |   64 +
 thirdparty/leveldb-1.18/port/port_android.h     |  159 ++
 thirdparty/leveldb-1.18/port/port_example.h     |  135 ++
 thirdparty/leveldb-1.18/port/port_posix.cc      |   54 +
 thirdparty/leveldb-1.18/port/port_posix.h       |  154 ++
 thirdparty/leveldb-1.18/port/port_win.cc        |  182 ++
 thirdparty/leveldb-1.18/port/port_win.h         |  164 ++
 thirdparty/leveldb-1.18/port/sha1_portable.cc   |  298 +++
 thirdparty/leveldb-1.18/port/sha1_portable.h    |   25 +
 thirdparty/leveldb-1.18/port/sha1_test.cc       |   39 +
 .../leveldb-1.18/port/thread_annotations.h      |   60 +
 thirdparty/leveldb-1.18/port/win/stdint.h       |   24 +
 thirdparty/leveldb-1.18/table/block.cc          |  268 +++
 thirdparty/leveldb-1.18/table/block.h           |   44 +
 thirdparty/leveldb-1.18/table/block_builder.cc  |  109 +
 thirdparty/leveldb-1.18/table/block_builder.h   |   57 +
 thirdparty/leveldb-1.18/table/filter_block.cc   |  111 +
 thirdparty/leveldb-1.18/table/filter_block.h    |   68 +
 .../leveldb-1.18/table/filter_block_test.cc     |  128 ++
 thirdparty/leveldb-1.18/table/format.cc         |  145 ++
 thirdparty/leveldb-1.18/table/format.h          |  108 +
 thirdparty/leveldb-1.18/table/iterator.cc       |   67 +
 .../leveldb-1.18/table/iterator_wrapper.h       |   63 +
 thirdparty/leveldb-1.18/table/merger.cc         |  198 ++
 thirdparty/leveldb-1.18/table/merger.h          |   28 +
 thirdparty/leveldb-1.18/table/table.cc          |  285 +++
 thirdparty/leveldb-1.18/table/table_builder.cc  |  270 +++
 thirdparty/leveldb-1.18/table/table_test.cc     |  868 +++++++
 .../leveldb-1.18/table/two_level_iterator.cc    |  182 ++
 .../leveldb-1.18/table/two_level_iterator.h     |   34 +
 thirdparty/leveldb-1.18/util/arena.cc           |   68 +
 thirdparty/leveldb-1.18/util/arena.h            |   68 +
 thirdparty/leveldb-1.18/util/arena_test.cc      |   68 +
 thirdparty/leveldb-1.18/util/bloom.cc           |   95 +
 thirdparty/leveldb-1.18/util/bloom_test.cc      |  161 ++
 thirdparty/leveldb-1.18/util/cache.cc           |  327 +++
 thirdparty/leveldb-1.18/util/cache_test.cc      |  186 ++
 thirdparty/leveldb-1.18/util/coding.cc          |  194 ++
 thirdparty/leveldb-1.18/util/coding.h           |  107 +
 thirdparty/leveldb-1.18/util/coding_test.cc     |  196 ++
 thirdparty/leveldb-1.18/util/comparator.cc      |   88 +
 thirdparty/leveldb-1.18/util/crc32c.cc          |  332 +++
 thirdparty/leveldb-1.18/util/crc32c.h           |   45 +
 thirdparty/leveldb-1.18/util/crc32c_test.cc     |   72 +
 thirdparty/leveldb-1.18/util/env.cc             |   96 +
 thirdparty/leveldb-1.18/util/env_boost.cc       |  726 ++++++
 thirdparty/leveldb-1.18/util/env_posix.cc       |  654 ++++++
 thirdparty/leveldb-1.18/util/env_test.cc        |   71 +
 thirdparty/leveldb-1.18/util/filter_policy.cc   |   11 +
 thirdparty/leveldb-1.18/util/hash.cc            |   55 +
 thirdparty/leveldb-1.18/util/hash.h             |   19 +
 thirdparty/leveldb-1.18/util/hash_test.cc       |   54 +
 thirdparty/leveldb-1.18/util/histogram.cc       |  139 ++
 thirdparty/leveldb-1.18/util/histogram.h        |   42 +
 thirdparty/leveldb-1.18/util/logging.cc         |   72 +
 thirdparty/leveldb-1.18/util/logging.h          |   43 +
 thirdparty/leveldb-1.18/util/mutexlock.h        |   41 +
 thirdparty/leveldb-1.18/util/options.cc         |   29 +
 thirdparty/leveldb-1.18/util/posix_logger.h     |   98 +
 thirdparty/leveldb-1.18/util/random.h           |   64 +
 thirdparty/leveldb-1.18/util/status.cc          |   75 +
 thirdparty/leveldb-1.18/util/testharness.cc     |   77 +
 thirdparty/leveldb-1.18/util/testharness.h      |  138 ++
 thirdparty/leveldb-1.18/util/testutil.cc        |   51 +
 thirdparty/leveldb-1.18/util/testutil.h         |   53 +
 thirdparty/leveldb-1.18/util/win_logger.cc      |   79 +
 thirdparty/leveldb-1.18/util/win_logger.h       |   28 +
 171 files changed, 30916 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 3866739..e5bdd7c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,3 +35,4 @@ addons:
     - libboost-all-dev
     - uuid-dev
     - libxml2-dev
+    - leveldb 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 4b01250..0a0f28a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -335,3 +335,33 @@ This product bundles 'Google Test' which is available under a 3-Clause BSD Licen
   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+This product bundles 'LevelDB' which is available under a 3-Clause BSD License.
+
+Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c8abc33..388fde2 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,6 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
 ## Caveats
 * 0.0.1 represents the first release, APIs and interfaces are subject to change
 * Build and usage currently only supports Linux and OS X environments. Providing the needed tooling to support Windows will be established as part of [MINIFI-34](https://issues.apache.org/jira/browse/MINIFI-34).
-* Currently, provenance events are not yet generated.  This effort is captured in [MINIFI-78](https://issues.apache.org/jira/browse/MINIFI-78).
 * Using Site to Site requires the additional manual step of specifying the remote socket.  This being autonegotiated through NiFi's REST API is captured in [MINIFI-70](https://issues.apache.org/jira/browse/MINIFI-70).
 * The processors currently implemented include:
   * TailFile
@@ -52,6 +51,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * GenerateFlowFile
   * LogAttribute
   * ListenSyslog
+* Provenance events generation is supported and they are persistent using levelDB.
 
 ## System Requirements
 
@@ -69,6 +69,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
 * libboost and boost-devel
   * 1.48.0 or greater
 * libxml2 and libxml2-devel
+* leveldb 
 
 ### To run
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 037ff8b..f9b2d9f 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -19,3 +19,7 @@ nifi.flow.configuration.file=./conf/flow.yml
 nifi.administrative.yield.duration=30 sec
 # If a component has no work to do (is "bored"), how long should we wait before checking again for work?
 nifi.bored.yield.duration=10 millis
+# Provenance Repository #
+nifi.provenance.repository.directory.default=./provenance_repository
+nifi.provenance.repository.max.storage.time=1 MIN
+nifi.provenance.repository.max.storage.size=1 MB

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h
index d325fa0..7409793 100644
--- a/libminifi/include/Configure.h
+++ b/libminifi/include/Configure.h
@@ -47,6 +47,9 @@ public:
 	static const char *nifi_server_name;
 	static const char *nifi_server_port;
 	static const char *nifi_server_report_interval;
+	static const char *nifi_provenance_repository_max_storage_time;
+	static const char *nifi_provenance_repository_max_storage_size;
+	static const char *nifi_provenance_repository_directory_default;
 
 	//! Clear the load config
 	void clear()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index 23f2d49..be32e1e 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -208,6 +208,8 @@ public:
 				_logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
 			}
 		}
+		else
+			_reportInterval = 0;
 	}
 	//! Destructor
 	virtual ~FlowControlProtocol()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 0d758df..b02a83c 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -47,6 +47,7 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
 #include "RemoteProcessorGroupPort.h"
+#include "Provenance.h"
 #include "GetFile.h"
 #include "TailFile.h"
 #include "ListenSyslog.h"
@@ -79,11 +80,16 @@ class FlowController
 public:
     static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
     static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
-	//! Constructor
-	/*!
-	 * Create a new Flow Controller
-	 */
-	FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME);
+	//! Get the singleton flow controller
+	static FlowController * getFlowController()
+	{
+		if (!_flowController)
+		{
+			_flowController = new FlowController();
+		}
+		return _flowController;
+	}
+
 	//! Destructor
 	virtual ~FlowController();
 	//! Set FlowController Name
@@ -128,11 +134,11 @@ public:
 	{
 		return _maxEventDrivenThreads;
 	}
-	//! Create FlowFile Repository
-	bool createFlowFileRepository();
-	//! Create Content Repository
-	bool createContentRepository();
-
+	//! Get the provenance repository
+	ProvenanceRepository *getProvenanceRepository()
+	{
+		return this->_provenanceRepo;
+	}
 	//! Life Cycle related function
 	//! Load flow xml from disk, after that, create the root process group and its children, initialize the flows
 	void load(ConfigFormat format);
@@ -188,6 +194,7 @@ protected:
 	//! Config
 	//! FlowFile Repo
 	//! Provenance Repo
+	ProvenanceRepository *_provenanceRepo;
 	//! Flow Engines
 	//! Flow Scheduler
 	TimerDrivenSchedulingAgent _timerScheduler;
@@ -238,6 +245,14 @@ private:
 	//! Parse Properties Node YAML for a processor
 	void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor);
 
+	static FlowController *_flowController;
+
+	//! Constructor
+	/*!
+	 * Create a new Flow Controller
+	 */
+	FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME);
+
 	// Prevent default copy constructor and assignment operation
 	// Only support pass by reference or pointer
 	FlowController(const FlowController &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 8b7362f..9382996 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -162,10 +162,19 @@ public:
 	void setOriginalConnection (Connection *connection) {
 		_orginalConnection = connection;
 	}
+	//! Get Original connection
+	Connection * getOriginalConnection() {
+		return _orginalConnection;
+	}
 	//! Get Resource Claim
 	ResourceClaim *getResourceClaim() {
 		return _claim;
 	}
+	//! Get lineageIdentifiers
+	std::set<std::string> getlineageIdentifiers()
+	{
+		return _lineageIdentifiers;
+	}
 
 protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/GetFile.h b/libminifi/include/GetFile.h
index eb975fd..4a8775f 100644
--- a/libminifi/include/GetFile.h
+++ b/libminifi/include/GetFile.h
@@ -97,7 +97,7 @@ private:
 	//! Poll directory listing for files
 	void pollListing(std::queue<std::string> &list, int maxSize);
 	//! Check whether file can be added to the directory listing
-	bool acceptFile(std::string fileName);
+	bool acceptFile(std::string fullName, std::string name);
 	//! Mutex for protection of the directory listing
 	std::mutex _mtx;
 	std::string _directory;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessSession.h b/libminifi/include/ProcessSession.h
index c8ec3a5..8dc8cc5 100644
--- a/libminifi/include/ProcessSession.h
+++ b/libminifi/include/ProcessSession.h
@@ -34,6 +34,7 @@
 #include "ProcessContext.h"
 #include "FlowFileRecord.h"
 #include "Exception.h"
+#include "Provenance.h"
 
 //! ProcessSession Class
 class ProcessSession
@@ -46,13 +47,23 @@ public:
 	ProcessSession(ProcessContext *processContext = NULL) : _processContext(processContext) {
 		_logger = Logger::getLogger();
 		_logger->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
+		_provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
+				_processContext->getProcessor()->getName());
 	}
 	//! Destructor
-	virtual ~ProcessSession() {}
+	virtual ~ProcessSession() {
+		if (_provenanceReport)
+			delete _provenanceReport;
+	}
 	//! Commit the session
 	void commit();
 	//! Roll Back the session
 	void rollback();
+	//! Get Provenance Report
+	ProvenanceReporter *getProvenanceReporter()
+	{
+		return _provenanceReport;
+	}
 	//!
 	//! Get the FlowFile from the highest priority queue
 	FlowFileRecord *get();
@@ -110,6 +121,8 @@ private:
 	ProcessSession &operator=(const ProcessSession &parent);
 	//! Logger
 	Logger *_logger;
+	//! Provenance Report
+	ProvenanceReporter *_provenanceReport;
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/include/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h
new file mode 100644
index 0000000..f3e814f
--- /dev/null
+++ b/libminifi/include/Provenance.h
@@ -0,0 +1,902 @@
+/**
+ * @file Provenance.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROVENANCE_H__
+#define __PROVENANCE_H__
+
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <set>
+#include <cassert>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <ftw.h>
+#include "leveldb/db.h"
+
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "Relationship.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+
+// Provenance Event Record Serialization Seg Size
+#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
+
+class ProvenanceRepository;
+
+//! Provenance Event Record
+class ProvenanceEventRecord
+{
+public:
+	enum ProvenanceEventType {
+
+	    /**
+	     * A CREATE event is used when a FlowFile is generated from data that was
+	     * not received from a remote system or external process
+	     */
+	    CREATE,
+
+	    /**
+	     * Indicates a provenance event for receiving data from an external process. This Event Type
+	     * is expected to be the first event for a FlowFile. As such, a Processor that receives data
+	     * from an external source and uses that data to replace the content of an existing FlowFile
+	     * should use the {@link #FETCH} event type, rather than the RECEIVE event type.
+	     */
+	    RECEIVE,
+
+	    /**
+	     * Indicates that the contents of a FlowFile were overwritten using the contents of some
+	     * external resource. This is similar to the {@link #RECEIVE} event but varies in that
+	     * RECEIVE events are intended to be used as the event that introduces the FlowFile into
+	     * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile
+	     * were overwritten.
+	     */
+	    FETCH,
+
+	    /**
+	     * Indicates a provenance event for sending data to an external process
+	     */
+	    SEND,
+
+	    /**
+	     * Indicates that the contents of a FlowFile were downloaded by a user or external entity.
+	     */
+	    DOWNLOAD,
+
+	    /**
+	     * Indicates a provenance event for the conclusion of an object's life for
+	     * some reason other than object expiration
+	     */
+	    DROP,
+
+	    /**
+	     * Indicates a provenance event for the conclusion of an object's life due
+	     * to the fact that the object could not be processed in a timely manner
+	     */
+	    EXPIRE,
+
+	    /**
+	     * FORK is used to indicate that one or more FlowFile was derived from a
+	     * parent FlowFile.
+	     */
+	    FORK,
+
+	    /**
+	     * JOIN is used to indicate that a single FlowFile is derived from joining
+	     * together multiple parent FlowFiles.
+	     */
+	    JOIN,
+
+	    /**
+	     * CLONE is used to indicate that a FlowFile is an exact duplicate of its
+	     * parent FlowFile.
+	     */
+	    CLONE,
+
+	    /**
+	     * CONTENT_MODIFIED is used to indicate that a FlowFile's content was
+	     * modified in some way. When using this Event Type, it is advisable to
+	     * provide details about how the content is modified.
+	     */
+	    CONTENT_MODIFIED,
+
+	    /**
+	     * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were
+	     * modified in some way. This event is not needed when another event is
+	     * reported at the same time, as the other event will already contain all
+	     * FlowFile attributes.
+	     */
+	    ATTRIBUTES_MODIFIED,
+
+	    /**
+	     * ROUTE is used to show that a FlowFile was routed to a specified
+	     * {@link org.apache.nifi.processor.Relationship Relationship} and should provide
+	     * information about why the FlowFile was routed to this relationship.
+	     */
+	    ROUTE,
+
+	    /**
+	     * Indicates a provenance event for adding additional information such as a
+	     * new linkage to a new URI or UUID
+	     */
+	    ADDINFO,
+
+	    /**
+	     * Indicates a provenance event for replaying a FlowFile. The UUID of the
+	     * event will indicate the UUID of the original FlowFile that is being
+	     * replayed. The event will contain exactly one Parent UUID that is also the
+	     * UUID of the FlowFile that is being replayed and exactly one Child UUID
+	     * that is the UUID of the a newly created FlowFile that will be re-queued
+	     * for processing.
+	     */
+	    REPLAY
+	};
+	friend class ProcessSession;
+public:
+	//! Constructor
+	/*!
+	 * Create a new provenance event record
+	 */
+	ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType) {
+		_eventType = event;
+		_componentId = componentId;
+		_componentType = componentType;
+		_eventTime = getTimeMillis();
+		char eventIdStr[37];
+		// Generate the global UUID for th event
+		uuid_generate(_eventId);
+		uuid_unparse(_eventId, eventIdStr);
+		_eventIdStr = eventIdStr;
+		_serializedBuf = NULL;
+		_serializeBufSize = 0;
+		_maxSerializeBufSize = 0;
+		_logger = Logger::getLogger();
+	}
+
+	ProvenanceEventRecord() {
+			_eventTime = getTimeMillis();
+			_serializedBuf = NULL;
+			_serializeBufSize = 0;
+			_maxSerializeBufSize = 0;
+			_logger = Logger::getLogger();
+	}
+
+	//! Destructor
+	virtual ~ProvenanceEventRecord() {
+	}
+	//! Get the Event ID
+	std::string getEventId() {
+		return _eventIdStr;
+	}
+	//! Get Attributes
+	std::map<std::string, std::string> getAttributes() {
+		return _attributes;
+	}
+	//! Get Size
+	uint64_t getFileSize() {
+		return _size;
+	}
+	// ! Get Offset
+	uint64_t getFileOffset() {
+		return _offset;
+	}
+	// ! Get Entry Date
+	uint64_t getFlowFileEntryDate() {
+		return _entryDate;
+	}
+	// ! Get Lineage Start Date
+	uint64_t getlineageStartDate() {
+		return _lineageStartDate;
+	}
+	// ! Get Event Time
+	uint64_t getEventTime() {
+		return _eventTime;
+	}
+	// ! Get Event Duration
+	uint64_t getEventDuration() {
+		return _eventDuration;
+	}
+	//! Set Event Duration
+	void setEventDuration(uint64_t duration)
+	{
+		_eventDuration = duration;
+	}
+	// ! Get Event Type
+	ProvenanceEventType getEventType() {
+		return _eventType;
+	}
+	//! Get Component ID
+	std::string getComponentId()
+	{
+		return _componentId;
+	}
+	//! Get Component Type
+	std::string getComponentType()
+	{
+		return _componentType;
+	}
+	//! Get FlowFileUuid
+	std::string getFlowFileUuid()
+	{
+		return _uuid;
+	}
+	//! Get content full path
+	std::string getContentFullPath()
+	{
+		return _contentFullPath;
+	}
+	//! Get LineageIdentifiers
+	std::set<std::string> getLineageIdentifiers()
+	{
+		return _lineageIdentifiers;
+	}
+	//! Get Details
+	std::string getDetails()
+	{
+		return _details;
+	}
+	//! Set Details
+	void setDetails(std::string details)
+	{
+		_details = details;
+	}
+	//! Get TransitUri
+	std::string getTransitUri()
+	{
+		return _transitUri;
+	}
+	//! Set TransitUri
+	void setTransitUri(std::string uri)
+	{
+		_transitUri = uri;
+	}
+	//! Get SourceSystemFlowFileIdentifier
+	std::string getSourceSystemFlowFileIdentifier()
+	{
+		return _sourceSystemFlowFileIdentifier;
+	}
+	//! Set SourceSystemFlowFileIdentifier
+	void setSourceSystemFlowFileIdentifier(std::string identifier)
+	{
+		_sourceSystemFlowFileIdentifier = identifier;
+	}
+	//! Get Parent UUIDs
+	std::vector<std::string> getParentUuids()
+	{
+		return _parentUuids;
+	}
+	//! Add Parent UUID
+	void addParentUuid(std::string uuid)
+	{
+		if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end())
+			return;
+		else
+			_parentUuids.push_back(uuid);
+	}
+	//! Add Parent Flow File
+	void addParentFlowFile(FlowFileRecord *flow)
+	{
+		addParentUuid(flow->getUUIDStr());
+		return;
+	}
+	//! Remove Parent UUID
+	void removeParentUuid(std::string uuid)
+	{
+		_parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end());
+	}
+	//! Remove Parent Flow File
+	void removeParentFlowFile(FlowFileRecord *flow)
+	{
+		removeParentUuid(flow->getUUIDStr());
+		return;
+	}
+	//! Get Children UUIDs
+	std::vector<std::string> getChildrenUuids()
+	{
+		return _childrenUuids;
+	}
+	//! Add Child UUID
+	void addChildUuid(std::string uuid)
+	{
+		if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end())
+			return;
+		else
+			_childrenUuids.push_back(uuid);
+	}
+	//! Add Child Flow File
+	void addChildFlowFile(FlowFileRecord *flow)
+	{
+		addChildUuid(flow->getUUIDStr());
+		return;
+	}
+	//! Remove Child UUID
+	void removeChildUuid(std::string uuid)
+	{
+		_childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end());
+	}
+	//! Remove Child Flow File
+	void removeChildFlowFile(FlowFileRecord *flow)
+	{
+		removeChildUuid(flow->getUUIDStr());
+		return;
+	}
+	//! Get AlternateIdentifierUri
+	std::string getAlternateIdentifierUri()
+	{
+		return _alternateIdentifierUri;
+	}
+	//! Set AlternateIdentifierUri
+	void setAlternateIdentifierUri(std::string uri)
+	{
+		_alternateIdentifierUri = uri;
+	}
+	//! Get Relationship
+	std::string getRelationship()
+	{
+		return _relationship;
+	}
+	//! Set Relationship
+	void setRelationship(std::string relation)
+	{
+		_relationship = relation;
+	}
+	//! Get sourceQueueIdentifier
+	std::string getSourceQueueIdentifier()
+	{
+		return _sourceQueueIdentifier;
+	}
+	//! Set sourceQueueIdentifier
+	void setSourceQueueIdentifier(std::string identifier)
+	{
+		_sourceQueueIdentifier = identifier;
+	}
+	//! fromFlowFile
+	void fromFlowFile(FlowFileRecord *flow)
+	{
+		_entryDate = flow->getEntryDate();
+		_lineageStartDate = flow->getlineageStartDate();
+		_lineageIdentifiers = flow->getlineageIdentifiers();
+		_uuid = flow->getUUIDStr();
+		_attributes = flow->getAttributes();
+		_size = flow->getSize();
+		_offset = flow->getOffset();
+		if (flow->getOriginalConnection())
+			_sourceQueueIdentifier = flow->getOriginalConnection()->getName();
+		if (flow->getResourceClaim())
+		{
+			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
+		}
+	}
+	//! Serialize and Persistent to the repository
+	bool Serialize(ProvenanceRepository *repo);
+	//! DeSerialize
+	bool DeSerialize(uint8_t *buffer, int bufferSize);
+	//! DeSerialize
+	bool DeSerialize(ProvenanceRepository *repo, std::string key);
+
+protected:
+
+	//! Event type
+	ProvenanceEventType _eventType;
+	//! Date at which the event was created
+	uint64_t _eventTime;
+	//! Date at which the flow file entered the flow
+	uint64_t _entryDate;
+	//! Date at which the origin of this flow file entered the flow
+	uint64_t _lineageStartDate;
+	//! Event Duration
+	uint64_t _eventDuration;
+	//! Component ID
+	std::string _componentId;
+	//! Component Type
+	std::string _componentType;
+	//! Size in bytes of the data corresponding to this flow file
+	uint64_t _size;
+	//! flow uuid
+	std::string _uuid;
+	//! Offset to the content
+	uint64_t _offset;
+	//! Full path to the content
+	std::string _contentFullPath;
+	//! Attributes key/values pairs for the flow record
+	std::map<std::string, std::string> _attributes;
+	//! provenance ID
+	uuid_t _eventId;
+	//! UUID string for all parents
+	std::set<std::string> _lineageIdentifiers;
+	//! transitUri
+	std::string _transitUri;
+	//! sourceSystemFlowFileIdentifier
+	std::string _sourceSystemFlowFileIdentifier;
+	//! parent UUID
+	std::vector<std::string> _parentUuids;
+	//! child UUID
+	std::vector<std::string> _childrenUuids;
+	//! detail
+	std::string _details;
+	//! sourceQueueIdentifier
+	std::string _sourceQueueIdentifier;
+	//! event ID Str
+	std::string _eventIdStr;
+	//! relationship
+	std::string _relationship;
+	//! alternateIdentifierUri;
+	std::string _alternateIdentifierUri;
+
+private:
+
+	//! Logger
+	Logger *_logger;
+	// All serialization related method and internal buf
+	uint8_t *_serializedBuf;
+	int _serializeBufSize;
+	int _maxSerializeBufSize;
+	int writeData(uint8_t *value, int size)
+	{
+		if ((_serializeBufSize + size) > _maxSerializeBufSize)
+		{
+			// if write exceed
+			uint8_t *buffer = new uint8_t[_maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE];
+			if (!buffer)
+			{
+				return -1;
+			}
+			memcpy(buffer, _serializedBuf, _serializeBufSize);
+			delete[] _serializedBuf;
+			_serializedBuf = buffer;
+			_maxSerializeBufSize = _maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE;
+		}
+		uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
+		memcpy(bufPtr, value, size);
+		_serializeBufSize += size;
+		return size;
+	}
+	int readData(uint8_t *buf, int buflen)
+	{
+		if ((buflen + _serializeBufSize) > _maxSerializeBufSize)
+		{
+			// if read exceed
+			return -1;
+		}
+		uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
+		memcpy(buf, bufPtr, buflen);
+		_serializeBufSize += buflen;
+		return buflen;
+	}
+	int write(uint8_t value)
+	{
+		return writeData(&value, 1);
+	}
+	int write(char value)
+	{
+		return writeData((uint8_t *)&value, 1);
+	}
+	int write(uint32_t value)
+	{
+		uint8_t temp[4];
+
+		temp[0] = (value & 0xFF000000) >> 24;
+		temp[1] = (value & 0x00FF0000) >> 16;
+		temp[2] = (value & 0x0000FF00) >> 8;
+		temp[3] = (value & 0x000000FF);
+		return writeData(temp, 4);
+	}
+	int write(uint16_t value)
+	{
+		uint8_t temp[2];
+		temp[0] = (value & 0xFF00) >> 8;
+		temp[1] = (value & 0xFF);
+		return writeData(temp, 2);
+	}
+	int write(uint8_t *value, int len)
+	{
+		return writeData(value, len);
+	}
+	int write(uint64_t value)
+	{
+		uint8_t temp[8];
+
+		temp[0] = (value >> 56) & 0xFF;
+		temp[1] = (value >> 48) & 0xFF;
+		temp[2] = (value >> 40) & 0xFF;
+		temp[3] = (value >> 32) & 0xFF;
+		temp[4] = (value >> 24) & 0xFF;
+		temp[5] = (value >> 16) & 0xFF;
+		temp[6] = (value >>  8) & 0xFF;
+		temp[7] = (value >>  0) & 0xFF;
+		return writeData(temp, 8);
+	}
+	int write(bool value)
+	{
+		uint8_t temp = value;
+		return write(temp);
+	}
+	int writeUTF(std::string str, bool widen = false);
+	int read(uint8_t &value)
+	{
+		uint8_t buf;
+
+		int ret = readData(&buf, 1);
+		if (ret == 1)
+			value = buf;
+		return ret;
+	}
+	int read(uint16_t &value)
+	{
+		uint8_t buf[2];
+
+		int ret = readData(buf, 2);
+		if (ret == 2)
+			value = (buf[0] << 8) | buf[1];
+		return ret;
+	}
+	int read(char &value)
+	{
+		uint8_t buf;
+
+		int ret = readData(&buf, 1);
+		if (ret == 1)
+			value = (char) buf;
+		return ret;
+	}
+	int read(uint8_t *value, int len)
+	{
+		return readData(value, len);
+	}
+	int read(uint32_t &value)
+	{
+		uint8_t buf[4];
+
+		int ret = readData(buf, 4);
+		if (ret == 4)
+			value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+		return ret;
+	}
+	int read(uint64_t &value)
+	{
+		uint8_t buf[8];
+
+		int ret = readData(buf, 8);
+		if (ret == 8)
+		{
+			value = ((uint64_t) buf[0] << 56) |
+					((uint64_t) (buf[1] & 255) << 48) |
+					((uint64_t) (buf[2] & 255) << 40) |
+					((uint64_t) (buf[3] & 255) << 32) |
+					((uint64_t) (buf[4] & 255) << 24) |
+					((uint64_t) (buf[5] & 255) << 16) |
+					((uint64_t) (buf[6] & 255) <<  8) |
+					((uint64_t) (buf[7] & 255) <<  0);
+		}
+		return ret;
+	}
+	int readUTF(std::string &str, bool widen = false);
+
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	ProvenanceEventRecord(const ProvenanceEventRecord &parent);
+	ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent);
+
+};
+
+//! Provenance Reporter
+class ProvenanceReporter
+{
+	friend class ProcessSession;
+public:
+	//! Constructor
+	/*!
+	 * Create a new provenance reporter associated with the process session
+	 */
+	ProvenanceReporter(std::string componentId, std::string componentType) {
+		_logger = Logger::getLogger();
+		_componentId = componentId;
+		_componentType = componentType;
+	}
+
+	//! Destructor
+	virtual ~ProvenanceReporter() {
+		clear();
+	}
+	//! Get events
+	std::set<ProvenanceEventRecord *> getEvents()
+	{
+		return _events;
+	}
+	//! Add event
+	void add(ProvenanceEventRecord *event)
+	{
+		_events.insert(event);
+	}
+	//! Remove event
+	void remove(ProvenanceEventRecord *event)
+	{
+		if (_events.find(event) != _events.end())
+		{
+			_events.erase(event);
+		}
+	}
+	//!
+	//! clear
+	void clear()
+	{
+		for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); it != _events.end(); ++it)
+		{
+			ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it);
+			delete event;
+		}
+		_events.clear();
+	}
+	//! allocate
+	ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, FlowFileRecord *flow)
+	{
+		ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType);
+		if (event)
+			event->fromFlowFile(flow);
+
+		return event;
+	}
+	//! commit
+	void commit();
+	//! create
+	void create(FlowFileRecord *flow, std::string detail);
+	//! route
+	void route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration);
+	//! modifyAttributes
+	void modifyAttributes(FlowFileRecord *flow, std::string detail);
+	//! modifyContent
+	void modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration);
+	//! clone
+	void clone(FlowFileRecord *parent, FlowFileRecord *child);
+	//! join
+	void join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration);
+	//! fork
+	void fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration);
+	//! expire
+	void expire(FlowFileRecord *flow, std::string detail);
+	//! drop
+	void drop(FlowFileRecord *flow, std::string reason);
+	//! send
+	void send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force);
+	//! fetch
+	void fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration);
+	//! receive
+	void receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration);
+
+protected:
+
+	//! Component ID
+	std::string _componentId;
+	//! Component Type
+	std::string _componentType;
+
+private:
+
+	//! Incoming connection Iterator
+	std::set<ProvenanceEventRecord *> _events;
+	//! Logger
+	Logger *_logger;
+
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	ProvenanceReporter(const ProvenanceReporter &parent);
+	ProvenanceReporter &operator=(const ProvenanceReporter &parent);
+};
+
+#define PROVENANCE_DIRECTORY "./provenance_repository"
+#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
+#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
+
+//! Provenance Repository
+class ProvenanceRepository
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new provenance repository
+	 */
+	ProvenanceRepository() {
+		_logger = Logger::getLogger();
+		_configure = Configure::getConfigure();
+		_directory = PROVENANCE_DIRECTORY;
+		_maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME;
+		_purgePeriod = PROVENANCE_PURGE_PERIOD;
+		_maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE;
+		_db = NULL;
+		_running = false;
+		_repoFull = false;
+	}
+
+	//! Destructor
+	virtual ~ProvenanceRepository() {
+		stop();
+		if (this->_thread)
+			delete this->_thread;
+		destroy();
+	}
+
+	//! initialize
+	bool initialize()
+	{
+		std::string value;
+		if (_configure->get(Configure::nifi_provenance_repository_directory_default, value))
+		{
+			_directory = value;
+		}
+		_logger->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
+		if (_configure->get(Configure::nifi_provenance_repository_max_storage_size, value))
+		{
+			Property::StringToInt(value, _maxPartitionBytes);
+		}
+		_logger->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
+		if (_configure->get(Configure::nifi_provenance_repository_max_storage_time, value))
+		{
+			TimeUnit unit;
+			if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
+						Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+			{
+			}
+		}
+		_logger->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
+		leveldb::Options options;
+		options.create_if_missing = true;
+		leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
+		if (status.ok())
+		{
+			_logger->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
+		}
+		else
+		{
+			_logger->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
+			return false;
+		}
+
+		// start the monitor thread
+		start();
+		return true;
+	}
+	//! Put
+	bool Put(std::string key, uint8_t *buf, int bufLen)
+	{
+		// persistent to the DB
+		leveldb::Slice value((const char *) buf, bufLen);
+		leveldb::Status status;
+		status = _db->Put(leveldb::WriteOptions(), key, value);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Delete
+	bool Delete(std::string key)
+	{
+		leveldb::Status status;
+		status = _db->Delete(leveldb::WriteOptions(), key);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Get
+	bool Get(std::string key, std::string &value)
+	{
+		leveldb::Status status;
+		status = _db->Get(leveldb::ReadOptions(), key, &value);
+		if (status.ok())
+			return true;
+		else
+			return false;
+	}
+	//! Persistent event
+	void registerEvent(ProvenanceEventRecord *event)
+	{
+		event->Serialize(this);
+	}
+	//! Remove event
+	void removeEvent(ProvenanceEventRecord *event)
+	{
+		Delete(event->getEventId());
+	}
+	//! destroy
+	void destroy()
+	{
+		if (_db)
+		{
+			delete _db;
+			_db = NULL;
+		}
+	}
+	//! Run function for the thread
+	static void run(ProvenanceRepository *repo);
+	//! Start the repository monitor thread
+	void start();
+	//! Stop the repository monitor thread
+	void stop();
+	//! whether the repo is full
+	bool isFull()
+	{
+		return _repoFull;
+	}
+
+protected:
+
+private:
+
+	//! Mutex for protection
+	std::mutex _mtx;
+	//! repository directory
+	std::string _directory;
+	//! Logger
+	Logger *_logger;
+	//! Configure
+	Configure *_configure;
+	//! max db entry life time
+	int64_t _maxPartitionMillis;
+	//! max db size
+	int64_t _maxPartitionBytes;
+	//! purge period
+	uint64_t _purgePeriod;
+	//! level DB database
+	leveldb::DB* _db;
+	//! thread
+	std::thread *_thread;
+	//! whether it is running
+	bool _running;
+	//! whether stop accepting provenace event
+	std::atomic<bool> _repoFull;
+	//! size of the directory
+	static uint64_t _repoSize;
+	//! call back for directory size
+	static int repoSum(const char *fpath, const struct stat *sb, int typeflag)
+	{
+	    _repoSize += sb->st_size;
+	    return 0;
+	}
+	//! repoSize
+	uint64_t repoSize()
+	{
+		_repoSize = 0;
+		if (ftw(_directory.c_str(), repoSum, 1) != 0)
+			_repoSize = 0;
+
+		return _repoSize;
+	}
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	ProvenanceRepository(const ProvenanceRepository &parent);
+	ProvenanceRepository &operator=(const ProvenanceRepository &parent);
+};
+
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index d7fd95b..ab9f43e 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -26,7 +26,9 @@ const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
 const char *Configure::nifi_server_name = "nifi.server.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
 const char *Configure::nifi_server_report_interval= "nifi.server.report.interval";
-
+const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
+const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
+const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
 
 //! Get the config value
 bool Configure::get(std::string key, std::string &value)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 8fbe3dc..ab1fcf7 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -31,6 +31,7 @@
 #include "FlowController.h"
 #include "ProcessContext.h"
 
+FlowController *FlowController::_flowController(NULL);
 FlowController::FlowController(std::string name)
 : _name(name)
 {
@@ -85,6 +86,8 @@ FlowController::FlowController(std::string name)
 	_logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
 
 	// Create repos for flow record and provenance
+	_provenanceRepo = new ProvenanceRepository();
+	_provenanceRepo->initialize();
 
 	_logger->log_info("FlowController %s created", _name.c_str());
 }
@@ -94,6 +97,7 @@ FlowController::~FlowController()
 	stop(true);
 	unload();
 	delete _protocol;
+	delete _provenanceRepo;
 }
 
 bool FlowController::isRunning()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
index 02e196a..70969c9 100644
--- a/libminifi/src/GetFile.cpp
+++ b/libminifi/src/GetFile.cpp
@@ -215,11 +215,11 @@ void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
 	return;
 }
 
-bool GetFile::acceptFile(std::string fileName)
+bool GetFile::acceptFile(std::string fullName, std::string name)
 {
 	struct stat statbuf;
 
-	if (stat(fileName.c_str(), &statbuf) == 0)
+	if (stat(fullName.c_str(), &statbuf) == 0)
 	{
 		if (_minSize > 0 && statbuf.st_size <_minSize)
 			return false;
@@ -234,18 +234,18 @@ bool GetFile::acceptFile(std::string fileName)
 		if (_maxAge > 0 && fileAge > _maxAge)
 			return false;
 
-		if (_ignoreHiddenFile && fileName.c_str()[0] == '.')
+		if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
 			return false;
 
-		if (access(fileName.c_str(), R_OK) != 0)
+		if (access(fullName.c_str(), R_OK) != 0)
 			return false;
 
-		if (_keepSourceFile == false && access(fileName.c_str(), W_OK) != 0)
+		if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
 			return false;
 
 		try {
 			std::regex re(_fileFilter);
-			if (!std::regex_match(fileName, re)) {
+			if (!std::regex_match(name, re)) {
 				return false;
 	   		}
 		} catch (std::regex_error e) {
@@ -284,7 +284,7 @@ void GetFile::performListing(std::string dir)
 		else
 		{
 			std::string fileName = dir + "/" + d_name;
-			if (acceptFile(fileName))
+			if (acceptFile(fileName, d_name))
 			{
 				// check whether we can take this file
 				putListing(fileName);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
index 4f526c3..abe75bd 100644
--- a/libminifi/src/ProcessSession.cpp
+++ b/libminifi/src/ProcessSession.cpp
@@ -38,6 +38,8 @@ FlowFileRecord* ProcessSession::create()
 	{
 		_addedFlowFiles[record->getUUIDStr()] = record;
 		_logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+		std::string details = _processContext->getProcessor()->getName() + " creates flow record " +  record->getUUIDStr();
+		_provenanceReport->create(record, details);
 	}
 
 	return record;
@@ -45,7 +47,15 @@ FlowFileRecord* ProcessSession::create()
 
 FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
 {
-	FlowFileRecord *record = this->create();
+	std::map<std::string, std::string> empty;
+	FlowFileRecord *record = new FlowFileRecord(empty);
+
+	if (record)
+	{
+		_addedFlowFiles[record->getUUIDStr()] = record;
+		_logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+	}
+
 	if (record)
 	{
 		// Copy attributes
@@ -81,6 +91,7 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
 			record->_size = parent->_size;
 			record->_claim->increaseFlowFileRecordOwnedCount();
 		}
+		_provenanceReport->clone(parent, record);
 	}
 	return record;
 }
@@ -118,6 +129,7 @@ FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
 	    	record->_size = parent->_size;
 	    	record->_claim->increaseFlowFileRecordOwnedCount();
 	    }
+	    _provenanceReport->clone(parent, record);
 	}
 
 	return record;
@@ -149,6 +161,7 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long
 			record->_claim = parent->_claim;
 			record->_claim->increaseFlowFileRecordOwnedCount();
 		}
+		_provenanceReport->clone(parent, record);
 	}
 	return record;
 }
@@ -157,16 +170,24 @@ void ProcessSession::remove(FlowFileRecord *flow)
 {
 	flow->_markedDelete = true;
 	_deletedFlowFiles[flow->getUUIDStr()] = flow;
+	std::string reason = _processContext->getProcessor()->getName() + " drop flow record " +  flow->getUUIDStr();
+	_provenanceReport->drop(flow, reason);
 }
 
 void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
 {
 	flow->setAttribute(key, value);
+	std::string details = _processContext->getProcessor()->getName() + " modify flow record " +  flow->getUUIDStr() +
+			" attribute " + key + ":" + value;
+	_provenanceReport->modifyAttributes(flow, details);
 }
 
 void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
 {
 	flow->removeAttribute(key);
+	std::string details = _processContext->getProcessor()->getName() + " remove flow record " +  flow->getUUIDStr() +
+				" attribute " + key;
+	_provenanceReport->modifyAttributes(flow, details);
 }
 
 void ProcessSession::penalize(FlowFileRecord *flow)
@@ -188,6 +209,7 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
 	try
 	{
 		std::ofstream fs;
+		uint64_t startTime = getTimeMillis();
 		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
 		if (fs.is_open())
 		{
@@ -209,6 +231,9 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
 				_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
 						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
 				fs.close();
+				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
+				uint64_t endTime = getTimeMillis();
+				_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -262,6 +287,7 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
 	try
 	{
 		std::ofstream fs;
+		uint64_t startTime = getTimeMillis();
 		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
 		if (fs.is_open())
 		{
@@ -276,6 +302,9 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
 				_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
 						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
 				fs.close();
+				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
+				uint64_t endTime = getTimeMillis();
+				_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -361,6 +390,7 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS
 	try
 	{
 		std::ofstream fs;
+		uint64_t startTime = getTimeMillis();
 		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
 		std::ifstream input;
 		input.open(source.c_str(), std::fstream::in | std::fstream::binary);
@@ -397,6 +427,9 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS
 				input.close();
 				if (!keepSource)
 					std::remove(source.c_str());
+				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
+				uint64_t endTime = getTimeMillis();
+				_provenanceReport->modifyContent(flow, details, endTime - startTime);
 			}
 			else
 			{
@@ -621,6 +654,8 @@ void ProcessSession::commit()
 		_clonedFlowFiles.clear();
 		_deletedFlowFiles.clear();
 		_originalFlowFiles.clear();
+		// persistent the provenance report
+		this->_provenanceReport->commit();
 		_logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
 	}
 	catch (std::exception &exception)
@@ -706,7 +741,10 @@ FlowFileRecord *ProcessSession::get()
 			// Remove expired flow record
 			for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
 			{
-				delete (*it);
+				FlowFileRecord *record = *it;
+				std::string details = _processContext->getProcessor()->getName() + " expire flow record " +  record->getUUIDStr();
+				_provenanceReport->expire(record, details);
+				delete (record);
 			}
 		}
 		if (ret)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
new file mode 100644
index 0000000..a2a4310
--- /dev/null
+++ b/libminifi/src/Provenance.cpp
@@ -0,0 +1,919 @@
+/**
+ * @file Provenance.cpp
+ * Provenance implemenatation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Provenance.h"
+#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+
+int ProvenanceEventRecord::readUTF(std::string &str, bool widen)
+{
+    uint16_t utflen;
+    int ret;
+
+    if (!widen)
+    {
+    	ret = read(utflen);
+    	if (ret <= 0)
+    		return ret;
+    }
+    else
+    {
+    	uint32_t len;
+       	ret = read(len);
+        if (ret <= 0)
+        	return ret;
+        utflen = len;
+    }
+
+    uint8_t *bytearr = NULL;
+    char *chararr = NULL;
+    bytearr = new uint8_t[utflen];
+    chararr = new char[utflen];
+    memset(chararr, 0, utflen);
+
+    int c, char2, char3;
+    int count = 0;
+    int chararr_count=0;
+
+    ret = read(bytearr, utflen);
+    if (ret <= 0)
+    {
+    	delete[] bytearr;
+    	delete[] chararr;
+    	if (ret == 0)
+    	{
+    	 if (!widen)
+    	    	return (2 + utflen);
+    	    else
+    	    	return (4 + utflen);
+    	}
+    	else
+    		return ret;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        if (c > 127) break;
+        count++;
+        chararr[chararr_count++]=(char)c;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        switch (c >> 4) {
+            case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
+                /* 0xxxxxxx*/
+                count++;
+                chararr[chararr_count++]=(char)c;
+                break;
+            case 12: case 13:
+                /* 110x xxxx   10xx xxxx*/
+                count += 2;
+                if (count > utflen)
+                {
+                	delete[] bytearr;
+                	delete[] chararr;
+                	return -1;
+                }
+                char2 = (int) bytearr[count-1];
+                if ((char2 & 0xC0) != 0x80)
+                {
+                	delete[] bytearr;
+                	delete[] chararr;
+                	return -1;
+                }
+                chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
+                                                (char2 & 0x3F));
+                break;
+            case 14:
+                /* 1110 xxxx  10xx xxxx  10xx xxxx */
+                count += 3;
+                if (count > utflen)
+                {
+                	delete[] bytearr;
+                	delete[] chararr;
+                	return -1;
+                }
+                char2 = (int) bytearr[count-2];
+                char3 = (int) bytearr[count-1];
+                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+                {
+                	delete[] bytearr;
+                	delete[] chararr;
+                	return -1;
+                }
+                chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
+                                                ((char2 & 0x3F) << 6)  |
+                                                ((char3 & 0x3F) << 0));
+                break;
+            default:
+            	delete[] bytearr;
+            	delete[] chararr;
+            	return -1;
+        }
+    }
+    // The number of chars produced may be less than utflen
+    std::string value(chararr, chararr_count);
+    str = value;
+    delete[] bytearr;
+    delete[] chararr;
+    if (!widen)
+    	return (2 + utflen);
+    else
+    	return (4 + utflen);
+}
+
+int ProvenanceEventRecord::writeUTF(std::string str, bool widen)
+{
+	int strlen = str.length();
+	int utflen = 0;
+	int c, count = 0;
+
+	/* use charAt instead of copying String to char array */
+	for (int i = 0; i < strlen; i++) {
+		c = str.at(i);
+		if ((c >= 0x0001) && (c <= 0x007F)) {
+			utflen++;
+		} else if (c > 0x07FF) {
+			utflen += 3;
+		} else {
+			utflen += 2;
+		}
+	}
+
+	if (utflen > 65535)
+		return -1;
+
+	uint8_t *bytearr = NULL;
+	if (!widen)
+	{
+		bytearr = new uint8_t[utflen+2];
+		bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+		bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+	}
+	else
+	{
+		bytearr = new uint8_t[utflen+4];
+		bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF);
+		bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF);
+		bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+		bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+	}
+
+	int i=0;
+	for (i=0; i<strlen; i++) {
+		c = str.at(i);
+		if (!((c >= 0x0001) && (c <= 0x007F))) break;
+		bytearr[count++] = (uint8_t) c;
+	}
+
+	for (;i < strlen; i++){
+		c = str.at(i);
+		if ((c >= 0x0001) && (c <= 0x007F)) {
+			bytearr[count++] = (uint8_t) c;
+		} else if (c > 0x07FF) {
+			bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F));
+			bytearr[count++] = (uint8_t) (0x80 | ((c >>  6) & 0x3F));
+			bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 0x3F));
+		} else {
+			bytearr[count++] = (uint8_t) (0xC0 | ((c >>  6) & 0x1F));
+			bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 0x3F));
+		}
+	}
+	int ret;
+	if (!widen)
+	{
+		ret = writeData(bytearr, utflen+2);
+	}
+	else
+	{
+		ret = writeData(bytearr, utflen+4);
+	}
+	delete[] bytearr;
+	return ret;
+}
+
+//! DeSerialize
+bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, std::string key)
+{
+	std::string value;
+	bool ret;
+
+	ret = repo->Get(key, value);
+
+	if (!ret)
+	{
+		_logger->log_error("NiFi Provenance Store event %s can not found", key.c_str());
+		return false;
+	}
+	else
+		_logger->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length());
+
+	ret = DeSerialize((unsigned char *) value.data(), value.length());
+
+	if (ret)
+	{
+		_logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), _serializeBufSize, _eventType);
+	}
+	else
+	{
+		_logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), _serializeBufSize, _eventType);
+	}
+
+	return ret;
+}
+
+bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo)
+{
+	if (_serializedBuf)
+		// Serialize in progress
+		return false;
+	_serializedBuf = NULL;
+	_serializeBufSize = 0;
+	_maxSerializeBufSize = 0;
+	_serializedBuf = new uint8_t[PROVENANCE_EVENT_RECORD_SEG_SIZE];
+	if (!_serializedBuf)
+		return false;
+	_maxSerializeBufSize = PROVENANCE_EVENT_RECORD_SEG_SIZE;
+
+	int ret;
+
+	ret = writeUTF(this->_eventIdStr);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	uint32_t eventType = this->_eventType;
+	ret = write(eventType);
+	if (ret != 4)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_eventTime);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_entryDate);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_eventDuration);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_lineageStartDate);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = writeUTF(this->_componentId);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = writeUTF(this->_componentType);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = writeUTF(this->_uuid);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = writeUTF(this->_details);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	// write flow attributes
+	uint32_t numAttributes = this->_attributes.size();
+	ret = write(numAttributes);
+	if (ret != 4)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	std::map<std::string, std::string>::iterator itAttribute;
+	for (itAttribute = this->_attributes.begin(); itAttribute!= this->_attributes.end(); itAttribute++)
+	{
+		ret = writeUTF(itAttribute->first, true);
+		if (ret <= 0)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+		ret = writeUTF(itAttribute->second, true);
+		if (ret <= 0)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+	}
+
+	ret = writeUTF(this->_contentFullPath);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_size);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = write(this->_offset);
+	if (ret != 8)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	ret = writeUTF(this->_sourceQueueIdentifier);
+	if (ret <= 0)
+	{
+		delete[] _serializedBuf;
+		_serializedBuf = NULL;
+		return false;
+	}
+
+	if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN)
+	{
+		// write UUIDs
+		uint32_t number = this->_parentUuids.size();
+		ret = write(number);
+		if (ret != 4)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+		std::vector<std::string>::iterator it;
+		for (it = this->_parentUuids.begin(); it!= this->_parentUuids.end(); it++)
+		{
+			std::string parentUUID = *it;
+			ret = writeUTF(parentUUID);
+			if (ret <= 0)
+			{
+				delete[] _serializedBuf;
+				_serializedBuf = NULL;
+				return false;
+			}
+		}
+		number = this->_childrenUuids.size();
+		ret = write(number);
+		if (ret != 4)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+		for (it = this->_childrenUuids.begin(); it!= this->_childrenUuids.end(); it++)
+		{
+			std::string childUUID = *it;
+			ret = writeUTF(childUUID);
+			if (ret <= 0)
+			{
+				delete[] _serializedBuf;
+				_serializedBuf = NULL;
+				return false;
+			}
+		}
+	}
+	else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH)
+	{
+		ret = writeUTF(this->_transitUri);
+		if (ret <= 0)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+	}
+	else if (this->_eventType == ProvenanceEventRecord::RECEIVE)
+	{
+		ret = writeUTF(this->_transitUri);
+		if (ret <= 0)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+		ret = writeUTF(this->_sourceSystemFlowFileIdentifier);
+		if (ret <= 0)
+		{
+			delete[] _serializedBuf;
+			_serializedBuf = NULL;
+			return false;
+		}
+	}
+
+	// Persistent to the DB
+	if (repo->Put(_eventIdStr, _serializedBuf, _serializeBufSize))
+	{
+		_logger->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), _serializeBufSize);
+	}
+	else
+	{
+		_logger->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), _serializeBufSize);
+	}
+
+	// cleanup
+	delete[] (_serializedBuf);
+	_serializedBuf = NULL;
+	_serializeBufSize = 0;
+
+	return true;
+}
+
+bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize)
+{
+	_serializedBuf = buffer;
+	_serializeBufSize = 0;
+	_maxSerializeBufSize = bufferSize;
+
+	int ret;
+
+	ret = readUTF(this->_eventIdStr);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	uint32_t eventType;
+	ret = read(eventType);
+	if (ret != 4)
+	{
+		return false;
+	}
+	this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
+
+	ret = read(this->_eventTime);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = read(this->_entryDate);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = read(this->_eventDuration);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = read(this->_lineageStartDate);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = readUTF(this->_componentId);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	ret = readUTF(this->_componentType);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	ret = readUTF(this->_uuid);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	ret = readUTF(this->_details);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	// read flow attributes
+	uint32_t numAttributes = 0;
+	ret = read(numAttributes);
+	if (ret != 4)
+	{
+		return false;
+	}
+
+	for (uint32_t i = 0; i < numAttributes; i++)
+	{
+		std::string key;
+		ret = readUTF(key, true);
+		if (ret <= 0)
+		{
+			return false;
+		}
+		std::string value;
+		ret = readUTF(value, true);
+		if (ret <= 0)
+		{
+			return false;
+		}
+		this->_attributes[key] = value;
+	}
+
+	ret = readUTF(this->_contentFullPath);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	ret = read(this->_size);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = read(this->_offset);
+	if (ret != 8)
+	{
+		return false;
+	}
+
+	ret = readUTF(this->_sourceQueueIdentifier);
+	if (ret <= 0)
+	{
+		return false;
+	}
+
+	if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN)
+	{
+		// read UUIDs
+		uint32_t number = 0;
+		ret = read(number);
+		if (ret != 4)
+		{
+			return false;
+		}
+		for (uint32_t i = 0; i < number; i++)
+		{
+			std::string parentUUID;
+			ret = readUTF(parentUUID);
+			if (ret <= 0)
+			{
+				return false;
+			}
+			this->addParentUuid(parentUUID);
+		}
+		number = 0;
+		ret = read(number);
+		if (ret != 4)
+		{
+			return false;
+		}
+		for (uint32_t i = 0; i < number; i++)
+		{
+			std::string childUUID;
+			ret = readUTF(childUUID);
+			if (ret <= 0)
+			{
+				return false;
+			}
+			this->addChildUuid(childUUID);
+		}
+	}
+	else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH)
+	{
+		ret = readUTF(this->_transitUri);
+		if (ret <= 0)
+		{
+			return false;
+		}
+	}
+	else if (this->_eventType == ProvenanceEventRecord::RECEIVE)
+	{
+		ret = readUTF(this->_transitUri);
+		if (ret <= 0)
+		{
+			return false;
+		}
+		ret = readUTF(this->_sourceSystemFlowFileIdentifier);
+		if (ret <= 0)
+		{
+			return false;
+		}
+	}
+
+	return true;
+}
+
+void ProvenanceReporter::commit()
+{
+	for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); it != _events.end(); ++it)
+	{
+		ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it);
+		if (!FlowController::getFlowController()->getProvenanceRepository()->isFull())
+			event->Serialize(FlowController::getFlowController()->getProvenanceRepository());
+		else
+			_logger->log_debug("Provenance Repository is full");
+	}
+}
+
+void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow);
+
+	if (event)
+	{
+		event->setDetails(detail);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow);
+
+	if (event)
+	{
+		event->setDetails(detail);
+		event->setRelationship(relation.getName());
+		event->setEventDuration(processingDuration);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, std::string detail)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
+
+	if (event)
+	{
+		event->setDetails(detail);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
+
+	if (event)
+	{
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent);
+
+	if (event)
+	{
+		event->addChildFlowFile(child);
+		event->addParentFlowFile(parent);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child);
+
+	if (event)
+	{
+		event->addChildFlowFile(child);
+		std::vector<FlowFileRecord *>::iterator it;
+		for (it = parents.begin(); it!= parents.end(); it++)
+		{
+			FlowFileRecord *record = *it;
+			event->addParentFlowFile(record);
+		}
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent);
+
+	if (event)
+	{
+		event->addParentFlowFile(parent);
+		std::vector<FlowFileRecord *>::iterator it;
+		for (it = child.begin(); it!= child.end(); it++)
+		{
+			FlowFileRecord *record = *it;
+			event->addChildFlowFile(record);
+		}
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow);
+
+	if (event)
+	{
+		event->setDetails(detail);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow);
+
+	if (event)
+	{
+		std::string dropReason = "Discard reason: " + reason;
+		event->setDetails(dropReason);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow);
+
+	if (event)
+	{
+		event->setTransitUri(transitUri);
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		if (!force)
+		{
+			add(event);
+		}
+		else
+		{
+			if (!FlowController::getFlowController()->getProvenanceRepository()->isFull())
+				event->Serialize(FlowController::getFlowController()->getProvenanceRepository());
+			delete event;
+		}
+	}
+}
+
+void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow);
+
+	if (event)
+	{
+		event->setTransitUri(transitUri);
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier);
+		add(event);
+	}
+}
+
+void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration)
+{
+	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow);
+
+	if (event)
+	{
+		event->setTransitUri(transitUri);
+		event->setDetails(detail);
+		event->setEventDuration(processingDuration);
+		add(event);
+	}
+}
+
+uint64_t ProvenanceRepository::_repoSize = 0;
+
+void ProvenanceRepository::start()
+{
+	if (this->_purgePeriod <= 0)
+		return;
+	if (_running)
+		return;
+	_running = true;
+	_logger->log_info("ProvenanceRepository Monitor Thread Start");
+	_thread = new std::thread(run, this);
+	_thread->detach();
+}
+
+void ProvenanceRepository::stop()
+{
+	if (!_running)
+		return;
+	_running = false;
+	_logger->log_info("ProvenanceRepository Monitor Thread Stop");
+}
+
+void ProvenanceRepository::run(ProvenanceRepository *repo)
+{
+	// threshold for purge
+	uint64_t purgeThreshold = repo->_maxPartitionBytes*3/4;
+	while (repo->_running)
+	{
+		std::this_thread::sleep_for(std::chrono::milliseconds(repo->_purgePeriod));
+		uint64_t curTime = getTimeMillis();
+		uint64_t size = repo->repoSize();
+		if (size >= purgeThreshold)
+		{
+			std::vector<std::string> purgeList;
+			leveldb::Iterator* it = repo->_db->NewIterator(leveldb::ReadOptions());
+			for (it->SeekToFirst(); it->Valid(); it->Next())
+			{
+				ProvenanceEventRecord eventRead;
+				std::string key = it->key().ToString();
+				if (eventRead.DeSerialize((uint8_t *)it->value().data(), (int) it->value().size()))
+				{
+					if ((curTime - eventRead.getEventTime()) > repo->_maxPartitionMillis)
+						purgeList.push_back(key);
+				}
+				else
+				{
+					repo->_logger->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
+					purgeList.push_back(key);
+				}
+			}
+			delete it;
+			std::vector<std::string>::iterator itPurge;
+			for (itPurge = purgeList.begin(); itPurge!= purgeList.end(); itPurge++)
+			{
+				std::string eventId = *itPurge;
+				repo->_logger->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str());
+				repo->Delete(eventId);
+			}
+		}
+		if (size > repo->_maxPartitionBytes)
+			repo->_repoFull = true;
+		else
+			repo->_repoFull = false;
+	}
+	return;
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 88ea78a..ae40dec 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -840,6 +840,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessS
 		while (true)
 		{
 			std::map<std::string, std::string> empty;
+			uint64_t startTime = getTimeMillis();
 			DataPacket packet(this, transaction, empty);
 			bool eof = false;
 
@@ -860,8 +861,11 @@ void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessS
 				return;
 			}
 			std::map<std::string, std::string>::iterator it;
+			std::string sourceIdentifier;
 			for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++)
 			{
+				if (it->first == FlowAttributeKey(UUID))
+					sourceIdentifier = it->second;
 				flowFile->addAttribute(it->first, it->second);
 			}
 
@@ -876,6 +880,10 @@ void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessS
 				}
 			}
 			Relationship relation; // undefined relationship
+			uint64_t endTime = getTimeMillis();
+			std::string transitUri = _peer->getURL() + "/" + sourceIdentifier;
+			std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + _peer->getHostName();
+			session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
 			session->transfer(flowFile, relation);
 			// receive the transfer for the flow record
 			bytes += packet._size;
@@ -1253,6 +1261,7 @@ void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, Process
 	{
 		while (continueTransaction)
 		{
+			uint64_t startTime = getTimeMillis();
 			DataPacket packet(this, transaction, flow->getAttributes());
 
 			if (!send(transactionID, &packet, flow, session))
@@ -1262,6 +1271,10 @@ void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, Process
 			}
 			_logger->log_info("Site2Site transaction %s send flow record %s",
 							transactionID.c_str(), flow->getUUIDStr().c_str());
+			uint64_t endTime = getTimeMillis();
+			std::string transitUri = _peer->getURL() + "/" + flow->getUUIDStr();
+			std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + _peer->getHostName();
+			session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
 			session->remove(flow);
 
 			uint64_t transferNanos = getTimeNano() - startSendingNanos;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
index 48e19d0..03e662e 100644
--- a/libminifi/src/Site2SitePeer.cpp
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -359,7 +359,15 @@ int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc)
     {
     	delete[] bytearr;
     	delete[] chararr;
-    	return ret;
+      	if (ret == 0)
+        {
+        	 if (!widen)
+        		 return (2 + utflen);
+        	 else
+        		 return (4 + utflen);
+        }
+        else
+        		return ret;
     }
 
     while (count < utflen) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index f083a4c..0f66cda 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty)
+include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
 
 # Include libxml2
 find_package(LibXml2)
@@ -42,7 +42,7 @@ if(CMAKE_THREAD_LIBS_INIT)
 endif()
 
 # Link against minifi, yaml-cpp and uuid
-target_link_libraries(minifiexe minifi yaml-cpp uuid)
+target_link_libraries(minifiexe minifi yaml-cpp uuid leveldb)
 set_target_properties(minifiexe
         PROPERTIES OUTPUT_NAME minifi)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 21ed046..11e8f00 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -94,7 +94,7 @@ int main(int argc, char **argv)
     configure->setHome(minifiHome);
     configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
 
-	controller = new FlowController();
+	controller = FlowController::getFlowController();
 
 	// Load flow from specified configuration file
 	controller->load(ConfigFormat::YAML);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/thirdparty/leveldb-1.18/.gitignore
----------------------------------------------------------------------
diff --git a/thirdparty/leveldb-1.18/.gitignore b/thirdparty/leveldb-1.18/.gitignore
new file mode 100755
index 0000000..8d3a4c4
--- /dev/null
+++ b/thirdparty/leveldb-1.18/.gitignore
@@ -0,0 +1,33 @@
+build_config.mk
+
+db_bench
+leveldbutil
+*_test
+
+*.slo
+*.lo
+*.o
+
+# Compiled Dynamic libraries
+*.so
+*.so.*
+*.dylib*
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
+
+
+/project/
+
+# CMake
+/CMakeFiles/
+*.dir/
+cmake_install.cmake
+/CMakeCache.txt
+
+# Visual Studio
+*.sln
+*.vcxproj
+*.vcxproj.filters
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/dc9544f8/thirdparty/leveldb-1.18/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/leveldb-1.18/AUTHORS b/thirdparty/leveldb-1.18/AUTHORS
new file mode 100755
index 0000000..2439d7a
--- /dev/null
+++ b/thirdparty/leveldb-1.18/AUTHORS
@@ -0,0 +1,12 @@
+# Names should be added to this file like so:
+# Name or Organization <email address>
+
+Google Inc.
+
+# Initial version authors:
+Jeffrey Dean <je...@google.com>
+Sanjay Ghemawat <sa...@google.com>
+
+# Partial list of contributors:
+Kevin Regan <ke...@gmail.com>
+Johan Bilien <jo...@litl.com>