You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/04/11 17:16:26 UTC

[18/20] nifi-minifi-cpp git commit: MINIFI-227: Provenance report

MINIFI-227: Provenance report

MINIFI-227: Provenance report

MINIFI-227: Provenance report

MINIFI-227: Provenance report (linter)

MINIFI-227: Provenance report (make_shared)

MINIFI-227: Provenance report

MINIFI-227: Add test case


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5d2d8ae9
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5d2d8ae9
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5d2d8ae9

Branch: refs/heads/MINIFI-227
Commit: 5d2d8ae99eda7c58a22f73ad1d210faf347b4158
Parents: 126c2ed
Author: Bin Qiu <be...@gmail.com>
Authored: Fri Mar 31 17:53:06 2017 -0700
Committer: Bin Qiu <be...@gmail.com>
Committed: Tue Apr 11 10:10:08 2017 -0700

----------------------------------------------------------------------
 LICENSE                                         |   53 +-
 README.md                                       |   21 +
 libminifi/include/RemoteProcessorGroupPort.h    |    1 +
 libminifi/include/core/Processor.h              |   14 +-
 .../include/provenance/ProvenanceRepository.h   |    5 +-
 .../include/provenance/ProvenanceTaskReport.h   |   15 +-
 libminifi/src/FlowFileRecord.cpp                |    2 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |   81 +-
 libminifi/src/core/Processor.cpp                |  100 +-
 .../src/provenance/ProvenanceRepository.cpp     |    1 -
 .../src/provenance/ProvenanceTaskReport.cpp     |  307 ++-
 libminifi/test/unit/ProcessorTests.cpp          |   98 +-
 libminifi/test/unit/ProvenanceTestHelper.h      |   44 +-
 thirdparty/jsoncpp/devtools/__init__.py         |    6 -
 thirdparty/jsoncpp/devtools/agent_vmw7.json     |   33 -
 thirdparty/jsoncpp/devtools/agent_vmxp.json     |   26 -
 thirdparty/jsoncpp/devtools/antglob.py          |  205 --
 thirdparty/jsoncpp/devtools/batchbuild.py       |  278 ---
 thirdparty/jsoncpp/devtools/fixeol.py           |   70 -
 thirdparty/jsoncpp/devtools/licenseupdater.py   |   94 -
 thirdparty/jsoncpp/devtools/tarball.py          |   52 -
 thirdparty/jsoncpp/test/cleantests.py           |   16 -
 .../jsoncpp/test/data/fail_test_array_01.json   |    1 -
 .../test/data/fail_test_stack_limit.json        |    1 -
 .../jsoncpp/test/data/test_array_01.expected    |    1 -
 thirdparty/jsoncpp/test/data/test_array_01.json |    1 -
 .../jsoncpp/test/data/test_array_02.expected    |    2 -
 thirdparty/jsoncpp/test/data/test_array_02.json |    1 -
 .../jsoncpp/test/data/test_array_03.expected    |    6 -
 thirdparty/jsoncpp/test/data/test_array_03.json |    1 -
 .../jsoncpp/test/data/test_array_04.expected    |    5 -
 thirdparty/jsoncpp/test/data/test_array_04.json |    1 -
 .../jsoncpp/test/data/test_array_05.expected    |  100 -
 thirdparty/jsoncpp/test/data/test_array_05.json |    1 -
 .../jsoncpp/test/data/test_array_06.expected    |    5 -
 thirdparty/jsoncpp/test/data/test_array_06.json |    4 -
 .../jsoncpp/test/data/test_array_07.expected    | 2122 ------------------
 thirdparty/jsoncpp/test/data/test_array_07.json |    2 -
 .../jsoncpp/test/data/test_basic_01.expected    |    1 -
 thirdparty/jsoncpp/test/data/test_basic_01.json |    1 -
 .../jsoncpp/test/data/test_basic_02.expected    |    1 -
 thirdparty/jsoncpp/test/data/test_basic_02.json |    1 -
 .../jsoncpp/test/data/test_basic_03.expected    |    3 -
 thirdparty/jsoncpp/test/data/test_basic_03.json |    3 -
 .../jsoncpp/test/data/test_basic_04.expected    |    2 -
 thirdparty/jsoncpp/test/data/test_basic_04.json |    2 -
 .../jsoncpp/test/data/test_basic_05.expected    |    2 -
 thirdparty/jsoncpp/test/data/test_basic_05.json |    2 -
 .../jsoncpp/test/data/test_basic_06.expected    |    2 -
 thirdparty/jsoncpp/test/data/test_basic_06.json |    2 -
 .../jsoncpp/test/data/test_basic_07.expected    |    2 -
 thirdparty/jsoncpp/test/data/test_basic_07.json |    2 -
 .../jsoncpp/test/data/test_basic_08.expected    |    3 -
 thirdparty/jsoncpp/test/data/test_basic_08.json |    3 -
 .../jsoncpp/test/data/test_basic_09.expected    |    4 -
 thirdparty/jsoncpp/test/data/test_basic_09.json |    4 -
 .../jsoncpp/test/data/test_comment_00.expected  |    4 -
 .../jsoncpp/test/data/test_comment_00.json      |    5 -
 .../jsoncpp/test/data/test_comment_01.expected  |   10 -
 .../jsoncpp/test/data/test_comment_01.json      |   10 -
 .../jsoncpp/test/data/test_comment_02.expected  |   23 -
 .../jsoncpp/test/data/test_comment_02.json      |   26 -
 .../jsoncpp/test/data/test_complex_01.expected  |   20 -
 .../jsoncpp/test/data/test_complex_01.json      |   17 -
 .../jsoncpp/test/data/test_integer_01.expected  |    2 -
 .../jsoncpp/test/data/test_integer_01.json      |    2 -
 .../jsoncpp/test/data/test_integer_02.expected  |    2 -
 .../jsoncpp/test/data/test_integer_02.json      |    2 -
 .../jsoncpp/test/data/test_integer_03.expected  |    2 -
 .../jsoncpp/test/data/test_integer_03.json      |    2 -
 .../jsoncpp/test/data/test_integer_04.expected  |    3 -
 .../jsoncpp/test/data/test_integer_04.json      |    3 -
 .../jsoncpp/test/data/test_integer_05.expected  |    2 -
 .../jsoncpp/test/data/test_integer_05.json      |    2 -
 .../test/data/test_integer_06_64bits.expected   |    1 -
 .../test/data/test_integer_06_64bits.json       |    2 -
 .../test/data/test_integer_07_64bits.expected   |    1 -
 .../test/data/test_integer_07_64bits.json       |    2 -
 .../test/data/test_integer_08_64bits.expected   |    1 -
 .../test/data/test_integer_08_64bits.json       |    2 -
 .../jsoncpp/test/data/test_large_01.expected    | 2122 ------------------
 thirdparty/jsoncpp/test/data/test_large_01.json |    2 -
 .../jsoncpp/test/data/test_object_01.expected   |    1 -
 .../jsoncpp/test/data/test_object_01.json       |    1 -
 .../jsoncpp/test/data/test_object_02.expected   |    2 -
 .../jsoncpp/test/data/test_object_02.json       |    1 -
 .../jsoncpp/test/data/test_object_03.expected   |    4 -
 .../jsoncpp/test/data/test_object_03.json       |    5 -
 .../jsoncpp/test/data/test_object_04.expected   |    2 -
 .../jsoncpp/test/data/test_object_04.json       |    3 -
 .../test/data/test_preserve_comment_01.expected |   11 -
 .../test/data/test_preserve_comment_01.json     |   14 -
 .../jsoncpp/test/data/test_real_01.expected     |    3 -
 thirdparty/jsoncpp/test/data/test_real_01.json  |    3 -
 .../jsoncpp/test/data/test_real_02.expected     |    3 -
 thirdparty/jsoncpp/test/data/test_real_02.json  |    3 -
 .../jsoncpp/test/data/test_real_03.expected     |    3 -
 thirdparty/jsoncpp/test/data/test_real_03.json  |    3 -
 .../jsoncpp/test/data/test_real_04.expected     |    3 -
 thirdparty/jsoncpp/test/data/test_real_04.json  |    3 -
 .../jsoncpp/test/data/test_real_05.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_05.json  |    3 -
 .../jsoncpp/test/data/test_real_06.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_06.json  |    3 -
 .../jsoncpp/test/data/test_real_07.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_07.json  |    3 -
 .../jsoncpp/test/data/test_real_08.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_08.json  |    4 -
 .../jsoncpp/test/data/test_real_09.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_09.json  |    4 -
 .../jsoncpp/test/data/test_real_10.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_10.json  |    4 -
 .../jsoncpp/test/data/test_real_11.expected     |    4 -
 thirdparty/jsoncpp/test/data/test_real_11.json  |    4 -
 .../jsoncpp/test/data/test_real_12.expected     |    2 -
 thirdparty/jsoncpp/test/data/test_real_12.json  |    2 -
 .../jsoncpp/test/data/test_string_01.expected   |    1 -
 .../jsoncpp/test/data/test_string_01.json       |    1 -
 .../jsoncpp/test/data/test_string_02.expected   |    1 -
 .../jsoncpp/test/data/test_string_02.json       |    1 -
 .../jsoncpp/test/data/test_string_03.expected   |    1 -
 .../jsoncpp/test/data/test_string_03.json       |    1 -
 .../jsoncpp/test/data/test_string_04.expected   |    2 -
 .../jsoncpp/test/data/test_string_04.json       |    2 -
 .../jsoncpp/test/data/test_string_05.expected   |    2 -
 .../jsoncpp/test/data/test_string_05.json       |    2 -
 .../test/data/test_string_unicode_01.expected   |    1 -
 .../test/data/test_string_unicode_01.json       |    1 -
 .../test/data/test_string_unicode_02.expected   |    1 -
 .../test/data/test_string_unicode_02.json       |    1 -
 .../test/data/test_string_unicode_03.expected   |    1 -
 .../test/data/test_string_unicode_03.json       |    1 -
 .../test/data/test_string_unicode_04.expected   |    1 -
 .../test/data/test_string_unicode_04.json       |    1 -
 .../test/data/test_string_unicode_05.expected   |    2 -
 .../test/data/test_string_unicode_05.json       |    1 -
 thirdparty/jsoncpp/test/generate_expected.py    |   17 -
 thirdparty/jsoncpp/test/jsonchecker/fail1.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail10.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail11.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail12.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail13.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail14.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail15.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail16.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail17.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail18.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail19.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail2.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail20.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail21.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail22.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail23.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail24.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail25.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail26.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail27.json |    2 -
 thirdparty/jsoncpp/test/jsonchecker/fail28.json |    2 -
 thirdparty/jsoncpp/test/jsonchecker/fail29.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail3.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail30.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail31.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail32.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail33.json |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail4.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail5.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail6.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail7.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail8.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/fail9.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/pass1.json  |   58 -
 thirdparty/jsoncpp/test/jsonchecker/pass2.json  |    1 -
 thirdparty/jsoncpp/test/jsonchecker/pass3.json  |    6 -
 thirdparty/jsoncpp/test/jsonchecker/readme.txt  |    3 -
 thirdparty/jsoncpp/test/pyjsontestrunner.py     |   71 -
 thirdparty/jsoncpp/test/runjsontests.py         |  174 --
 thirdparty/jsoncpp/test/rununittests.py         |   84 -
 177 files changed, 439 insertions(+), 6246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 8a0f32e..0439ead 100644
--- a/LICENSE
+++ b/LICENSE
@@ -508,4 +508,55 @@ The source is available under a 3-Clause BSD License.
 	LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 	(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
 	USE OF THIS SOFTWARE, EVEN IF NOT ADVISED OF THE POSSIBILITY OF SUCH
-	DAMAGE.
\ No newline at end of file
+	DAMAGE.
+
+This product bundles 'JsonCpp' which is available under a MIT license.
+
+The JsonCpp library's source code, including accompanying documentation,
+tests and demonstration applications, are licensed under the following
+conditions...
+
+The author (Baptiste Lepilleur) explicitly disclaims copyright in all
+jurisdictions which recognize such a disclaimer. In such jurisdictions,
+this software is released into the Public Domain.
+
+In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
+2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is
+released under the terms of the MIT License (see below).
+
+In jurisdictions which recognize Public Domain property, the user of this
+software may choose to accept it either as 1) Public Domain, 2) under the
+conditions of the MIT License (see below), or 3) under the terms of dual
+Public Domain/MIT License conditions described here, as they choose.
+
+The MIT License is about as close to Public Domain as a license can get, and is
+described in clear, concise terms at:
+
+   http://en.wikipedia.org/wiki/MIT_License
+
+The full text of the MIT License follows:
+
+========================================================================
+Copyright (c) 2007-2010 Baptiste Lepilleur
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use, copy,
+modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+========================================================================
+(END LICENSE TEXT)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 484c97a..74d0afc 100644
--- a/README.md
+++ b/README.md
@@ -53,6 +53,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * ListenSyslog
   * ExecuteProcess
   * AppendHostInfo
+  * ProvenanceTaskReport
 * Provenance events generation is supported and are persisted using levelDB.
 
 ## System Requirements
@@ -286,6 +287,26 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
 
+### Provenance Report
+
+    Add ProvenanceTaskReport processor to config.yml
+    Port UUID of the ProvenanceTaskReport is the remote NiFi input port UUID
+ 
+    - name: ProvenanceTaskReport
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a209
+      class: org.apache.nifi.processors.standard.ProvenanceTaskReport
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Port: 10001
+          Host Name: localhost
+          Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+
 ### Running
 After completing a [build](#building), the application can be run by issuing the following from :
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 8667519..f8aac38 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -55,6 +55,7 @@ class RemoteProcessorGroupPort : public core::Processor {
   // Supported Properties
   static core::Property hostName;
   static core::Property port;
+  static core::Property portUUID;
   // Supported Relationships
   static core::Relationship relation;
  public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 2b540ec..e945fa4 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -42,6 +42,9 @@
 #include "ProcessSessionFactory.h"
 #include "Scheduling.h"
 
+#include <stack>
+#include "Site2SiteClientProtocol.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -239,12 +242,21 @@ class Processor : public Connectable, public ConfigurableComponent,
   // Trigger the Processor even if the incoming connection is empty
   std::atomic<bool> _triggerWhenEmpty;
 
- private:
+  //! obtainSite2SiteProtocol for use
+  std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(std::string host, uint16_t sport, uuid_t portId);
+  //! returnSite2SiteProtocol after use
+  void returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol);
+
+private:
 
   // Mutex for protection
   std::mutex mutex_;
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
+  
+  // Site2Site Protocols
+  std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_;
+  std::atomic<bool> protocols_created_;
 
   // Check all incoming connections for work
   bool isWorkAvailable();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
index 7dd1757..b96021c 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -113,6 +113,9 @@ class ProvenanceRepository : public core::Repository,
   // Put
   virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
 
+	if (repo_full_)
+		return false;
+
     // persistent to the DB
     leveldb::Slice value((const char *) buf, bufLen);
     leveldb::Status status;
@@ -156,7 +159,7 @@ class ProvenanceRepository : public core::Repository,
 	leveldb::Iterator* it = db_->NewIterator(
 				leveldb::ReadOptions());
 	for (it->SeekToFirst(); it->Valid(); it->Next()) {
-			std::shared_ptr<ProvenanceEventRecord> eventRead (new ProvenanceEventRecord());
+			std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
 			std::string key = it->key().ToString();
 			if (records.size() >= maxSize)
 				break;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/include/provenance/ProvenanceTaskReport.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h b/libminifi/include/provenance/ProvenanceTaskReport.h
index 38edeeb..932ed39 100644
--- a/libminifi/include/provenance/ProvenanceTaskReport.h
+++ b/libminifi/include/provenance/ProvenanceTaskReport.h
@@ -44,7 +44,8 @@ public:
 	ProvenanceTaskReport(std::string name, uuid_t uuid = NULL) :
 			core::Processor(name, uuid) {
 		logger_ = logging::Logger::getLogger();
-		uuid_copy(protocol_uuid_,uuid);
+		if (uuid)
+		  uuid_copy(protocol_uuid_,uuid);
 		this->setTriggerWhenEmpty(true);
 	}
 	//! Destructor
@@ -52,14 +53,20 @@ public:
 
 	}
 	//! Processor Name
-	static const std::string ProcessorName;
+	static constexpr char const* ProcessorName = "ProvenanceTaskReport";
 	//! Supported Properties
 	static core::Property hostName;
 	static core::Property port;
 	static core::Property batchSize;
+	static core::Property portUUID;
 	//! Supported Relationships
 	static core::Relationship relation;
+	static const char *ProvenanceAppStr;
 public:
+	//! Get provenance jason report
+	void getJasonReport(core::ProcessContext *context,
+	    core::ProcessSession *session, std::vector < std::shared_ptr < ProvenanceEventRecord >> &records,
+	    std::string &report);
 	//! OnTrigger method, implemented by NiFi ProvenanceTaskReport
 	virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
 	//! Initialize, over write by NiFi ProvenanceTaskReport
@@ -68,10 +75,6 @@ public:
 protected:
 
 private:
-	std::unique_ptr<Site2SiteClientProtocol> getNextProtocol();
-	void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol);
-	std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_;
-	std::mutex protocol_mutex_;
 	uuid_t protocol_uuid_;
 	//! Logger
 	std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index de682b0..31f5d58 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -265,7 +265,7 @@ bool FlowFileRecord::Serialize() {
                        uuid_str_.c_str(), outStream.getSize());
     return true;
   } else {
-    logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+    logger_->log_debug("NiFi FlowFile Store event %s size %d fail",
                        uuid_str_.c_str(), outStream.getSize());
     return false;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 33f0cb2..8ee3680 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -44,33 +44,18 @@ namespace minifi {
 const std::string RemoteProcessorGroupPort::ProcessorName(
     "RemoteProcessorGroupPort");
 core::Property RemoteProcessorGroupPort::hostName("Host Name",
-                                                  "Remote Host Name.",
-                                                  "localhost");
+    "Remote Host Name.", "localhost");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+core::Property RemoteProcessorGroupPort::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
-std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() {
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  if (available_protocols_.empty())
-    return nullptr;
-
-  std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(
-      available_protocols_.top());
-  available_protocols_.pop();
-  return std::move(return_pointer);
-}
-
-void RemoteProcessorGroupPort::returnProtocol(
-    std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
-  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-  available_protocols_.push(std::move(return_protocol));
-}
-
 void RemoteProcessorGroupPort::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
   properties.insert(hostName);
   properties.insert(port);
+  properties.insert(portUUID);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -79,50 +64,42 @@ void RemoteProcessorGroupPort::initialize() {
 }
 
 void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
-                                         core::ProcessSession *session) {
-  std::string value;
-
+    core::ProcessSession *session) {
   if (!transmitting_)
     return;
 
-  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
-
-  // Peer Connection
-  if (protocol_ == nullptr) {
-    protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
-        new Site2SiteClientProtocol(0));
-    protocol_->setPortId(protocol_uuid_);
-    protocol_->setTimeOut(timeout_);
-
-    std::string host = "";
-    uint16_t sport = 0;
-    int64_t lvalue;
+  std::string value;
+  int64_t lvalue;
+  std::string host = "";
+  uint16_t sport = 0;
 
-    if (context->getProperty(hostName.getName(), value)) {
-      host = value;
-    }
-    if (context->getProperty(port.getName(), value)
-        && core::Property::StringToInt(value, lvalue)) {
-      sport = (uint16_t) lvalue;
-    }
-    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-        std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-            org::apache::nifi::minifi::io::StreamFactory::getInstance()
-                ->createSocket(host, sport));
+  if (context->getProperty(hostName.getName(), value)) {
+    host = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
 
-    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
-        new Site2SitePeer(std::move(str), host, sport));
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
 
-    protocol_->setPeer(std::move(peer_));
+  if (!protocol_) {
+    context->yield();
+    return;
   }
 
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-        context->getProcessorNode().getProcessor());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
     logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-                       processor->getYieldPeriodMsec());
+        processor->getYieldPeriodMsec());
+    returnSite2SiteProtocol(protocol_);
     return;
   }
 
@@ -131,7 +108,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   else
     protocol_->transferFlowFiles(context, session);
 
-  returnProtocol(std::move(protocol_));
+  returnSite2SiteProtocol(protocol_);
 
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 9a0898a..9d44d39 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -29,11 +29,13 @@
 #include <thread>
 #include <memory>
 #include <functional>
+#include <utility>
 #include "Connection.h"
 #include "core/Connectable.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
+#include "../include/io/StreamFactory.h"
 
 namespace org {
 namespace apache {
@@ -41,15 +43,15 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-Processor::Processor(std::string name, uuid_t uuid)
-    : Connectable(name, uuid),
-      ConfigurableComponent(logging::Logger::getLogger()) {
+Processor::Processor(std::string name, uuid_t uuid) :
+    Connectable(name, uuid), ConfigurableComponent(logging::Logger::getLogger()) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
   strategy_ = TIMER_DRIVEN;
   loss_tolerant_ = false;
   _triggerWhenEmpty = false;
+  protocols_created_ = false;
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
   run_durantion_nano_ = 0;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
@@ -60,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
   incoming_connections_Iter = this->_incomingConnections.begin();
   logger_ = logging::Logger::getLogger();
   logger_->log_info("Processor %s created UUID %s", name_.c_str(),
-                    uuidStr_.c_str());
+      uuidStr_.c_str());
 }
 
 bool Processor::isRunning() {
@@ -76,12 +78,12 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
 
   if (isRunning()) {
     logger_->log_info("Can not add connection while the process %s is running",
-                      name_.c_str());
+        name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection
+      > (conn);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
@@ -114,7 +116,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
     auto &&it = _outGoingConnections.find(relationship);
     if (it != _outGoingConnections.end()) {
       // We already has connection for this relationship
-      std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
+      std::set < std::shared_ptr < Connectable >> existedConnection =
+          it->second;
       if (existedConnection.find(connection) == existedConnection.end()) {
         // We do not have the same connection for this relationship yet
         existedConnection.insert(connection);
@@ -127,7 +130,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
       }
     } else {
       // We do not have any outgoing connection for this relationship yet
-      std::set<std::shared_ptr<Connectable>> newConnection;
+      std::set < std::shared_ptr < Connectable >> newConnection;
       newConnection.insert(connection);
       connection->setSource(shared_from_this());
       _outGoingConnections[relationship] = newConnection;
@@ -149,13 +152,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     return;
   }
 
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection
+      > (conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -191,15 +194,63 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   }
 }
 
+std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol(
+    std::string host, uint16_t sport, uuid_t portId) {
+  std::lock_guard < std::mutex > lock(mutex_);
+
+  if (!protocols_created_) {
+    for (int i = 0; i < this->max_concurrent_tasks_; i++) {
+      // create the protocol pool based on max threads allowed
+      std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
+      protocols_created_ = true;
+      protocol->setPortId(portId);
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+          std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+              > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                  host, sport));
+      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+          > (new Site2SitePeer(std::move(str), host, sport));
+      protocol->setPeer(std::move(peer_));
+      available_protocols_.push(protocol);
+    }
+  }
+  if (!available_protocols_.empty()) {
+    std::shared_ptr<Site2SiteClientProtocol> return_pointer =
+        available_protocols_.top();
+    available_protocols_.pop();
+    return return_pointer;
+  } else {
+    // create the protocol on demand if we exceed the pool
+    std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr);
+    protocol->setPortId(portId);
+    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+        std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+            > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket(
+                host, sport));
+    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+        > (new Site2SitePeer(std::move(str), host, sport));
+    protocol->setPeer(std::move(peer_));
+    return protocol;
+  }
+}
+
+void Processor::returnSite2SiteProtocol(
+    std::shared_ptr<Site2SiteClientProtocol> protocol) {
+  std::lock_guard < std::mutex > lock(mutex_);
+  if (protocol && available_protocols_.size() < max_concurrent_tasks_) {
+    available_protocols_.push(protocol);
+  }
+}
+
 bool Processor::flowFilesQueued() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   if (_incomingConnections.size() == 0)
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection =
-        std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast
+        < Connection > (conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -208,14 +259,15 @@ bool Processor::flowFilesQueued() {
 }
 
 bool Processor::flowFilesOutGoingFull() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
 
   for (auto &&connection : _outGoingConnections) {
     // We already has connection for this relationship
-    std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
+    std::set < std::shared_ptr < Connectable >> existedConnection =
+        connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr < Connection > connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->isFull())
         return true;
     }
@@ -225,7 +277,7 @@ bool Processor::flowFilesOutGoingFull() {
 }
 
 void Processor::onTrigger(ProcessContext *context,
-                          ProcessSessionFactory *sessionFactory) {
+    ProcessSessionFactory *sessionFactory) {
   auto session = sessionFactory->createSession();
 
   try {
@@ -249,8 +301,8 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast
+          < Connection > (conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;
@@ -259,7 +311,7 @@ bool Processor::isWorkAvailable() {
   } catch (...) {
     logger_->log_error(
         "Caught an exception while checking if work is available;"
-        " unless it was positively determined that work is available, assuming NO work is available!");
+            " unless it was positively determined that work is available, assuming NO work is available!");
   }
 
   return hasWork;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index 1cf66f6..77de5ba 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -35,7 +35,6 @@ void ProvenanceRepository::run() {
     uint64_t curTime = getTimeMillis();
     uint64_t size = repoSize();
     if (size >= purgeThreshold) {
-      // std::lock_guard<std::mutex> lock(mutex_);
       std::vector<std::string> purgeList;
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
       for (it->SeekToFirst(); it->Valid(); it->Next()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/src/provenance/ProvenanceTaskReport.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp b/libminifi/src/provenance/ProvenanceTaskReport.cpp
index d4995a0..dfe81e8 100644
--- a/libminifi/src/provenance/ProvenanceTaskReport.cpp
+++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp
@@ -21,10 +21,9 @@
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
+#include <string>
+#include <memory>
 #include <sstream>
-#include <string.h>
 #include <iostream>
 
 #include "provenance/ProvenanceTaskReport.h"
@@ -43,175 +42,153 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace provenance{
+namespace provenance {
 
-const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport");
-core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost");
+core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.",
+    "localhost");
 core::Property ProvenanceTaskReport::port("Port", "Remote Port", "9999");
-core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100");
+core::Property ProvenanceTaskReport::batchSize("Batch Size",
+    "Specifies how many records to send in a single batch, at most.", "100");
+core::Property ProvenanceTaskReport::portUUID("Port UUID",
+    "Specifies remote NiFi Port UUID.", "");
 core::Relationship ProvenanceTaskReport::relation;
-
-void ProvenanceTaskReport::initialize()
-{
-	//! Set the supported properties
-	std::set<core::Property> properties;
-	properties.insert(hostName);
-	properties.insert(port);
-	properties.insert(batchSize);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<core::Relationship> relationships;
-	relationships.insert(relation);
-	setSupportedRelationships(relationships);
-}
-
-std::unique_ptr<Site2SiteClientProtocol> ProvenanceTaskReport::getNextProtocol()
-{
-	std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-	if (available_protocols_.empty())
-		return nullptr;
-	std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(available_protocols_.top());
-	available_protocols_.pop();
-	return std::move(return_pointer);
+const char *ProvenanceTaskReport::ProvenanceAppStr = "MiNiFi Flow";
+
+void ProvenanceTaskReport::initialize() {
+  //! Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(hostName);
+  properties.insert(port);
+  properties.insert(batchSize);
+  properties.insert(portUUID);
+  setSupportedProperties(properties);
+  //! Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(relation);
+  setSupportedRelationships(relationships);
 }
 
-void ProvenanceTaskReport::returnProtocol(
-  std::unique_ptr<Site2SiteClientProtocol> return_protocol)
-{
-	std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
-	available_protocols_.push(std::move(return_protocol));
+void ProvenanceTaskReport::getJasonReport(core::ProcessContext *context,
+    core::ProcessSession *session,
+    std::vector<std::shared_ptr<ProvenanceEventRecord>> &records,
+    std::string &report) {
+
+  Json::Value array;
+  for (auto record : records) {
+    Json::Value recordJson;
+    Json::Value updatedAttributesJson;
+    Json::Value parentUuidJson;
+    Json::Value childUuidJson;
+    recordJson["eventId"] = record->getEventId().c_str();
+    recordJson["eventType"] =
+        ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+    recordJson["timestampMillis"] = record->getEventTime();
+    recordJson["durationMillis"] = record->getEventDuration();
+    recordJson["lineageStart"] = record->getlineageStartDate();
+    recordJson["details"] = record->getDetails().c_str();
+    recordJson["componentId"] = record->getComponentId().c_str();
+    recordJson["componentType"] = record->getComponentType().c_str();
+    recordJson["entityId"] = record->getFlowFileUuid().c_str();
+    recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
+    recordJson["entitySize"] = record->getFileSize();
+    recordJson["entityOffset"] = record->getFileOffset();
+
+    for (auto attr : record->getAttributes()) {
+      updatedAttributesJson[attr.first] = attr.second;
+    }
+    recordJson["updatedAttributes"] = updatedAttributesJson;
+
+    for (auto parentUUID : record->getParentUuids()) {
+      parentUuidJson.append(parentUUID.c_str());
+    }
+    recordJson["parentIds"] = parentUuidJson;
+
+    for (auto childUUID : record->getChildrenUuids()) {
+      childUuidJson.append(childUUID.c_str());
+    }
+    recordJson["childIds"] = childUuidJson;
+    recordJson["transitUri"] = record->getTransitUri().c_str();
+    recordJson["remoteIdentifier"] =
+        record->getSourceSystemFlowFileIdentifier().c_str();
+    recordJson["alternateIdentifier"] =
+        record->getAlternateIdentifierUri().c_str();
+    recordJson["application"] = ProvenanceAppStr;
+    array.append(recordJson);
+  }
+
+  Json::StyledWriter writer;
+  report = writer.write(array);
 }
 
-void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session)
-{
-	std::string value;
-	int64_t lvalue;
-	
-	std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
-
-	if (protocol_ == nullptr)
-	{
-		protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
-	        new Site2SiteClientProtocol(0));
-	    protocol_->setPortId(protocol_uuid_);
-
-	    std::string host = "";
-	    uint16_t sport = 0;
-
-	    if (context->getProperty(hostName.getName(), value)) {
-	      host = value;
-	    }
-	    if (context->getProperty(port.getName(), value)
-	        && core::Property::StringToInt(value, lvalue)) {
-	      sport = (uint16_t) lvalue;
-	    }
-	    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-	        std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-	            org::apache::nifi::minifi::io::StreamFactory::getInstance()
-	                ->createSocket(host, sport));
-
-	    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
-	        new Site2SitePeer(std::move(str), host, sport));
-
-	    protocol_->setPeer(std::move(peer_));
-	}
-
-	if (!protocol_->bootstrap())
-	{
-	    // bootstrap the client protocol if needeed
-	    context->yield();
-	    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-	        context->getProcessorNode().getProcessor());
-	    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-	                       processor->getYieldPeriodMsec());
-	    return;
-	}
-
-	int batch = 100;
-
-	if (context->getProperty(batchSize.getName(), value) && core::Property::StringToInt(value, lvalue))
-	{
-		batch = (int) lvalue;
-	}
-	
-	std::vector<std::shared_ptr<ProvenanceEventRecord>> records;
-	std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast<ProvenanceRepository> (context->getProvenanceRepository());
-
-	repo->getProvenanceRecord(records, batch);
-
-	if (records.size() <= 0)
-	{
-		returnProtocol(std::move(protocol_));
-		return;
-	}
-
-	Json::Value array;
-	for (auto record : records)
-	{
-		Json::Value recordJson;
-		Json::Value updatedAttributesJson;
-		Json::Value parentUuidJson;
-		Json::Value childUuidJson;
-		recordJson["eventId"] = record->getEventId().c_str();
-		recordJson["eventType"] = ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
-		recordJson["timestampMillis"] = record->getEventTime();
-		recordJson["durationMillis"] = record->getEventDuration();
-		recordJson["lineageStart"] = record->getlineageStartDate();
-		recordJson["details"] = record->getDetails().c_str();
-		recordJson["componentId"] = record->getComponentId().c_str();
-		recordJson["componentType"] = record->getComponentType().c_str();
-		recordJson["entityId"] = record->getFlowFileUuid().c_str();
-		recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
-		recordJson["entitySize"] = record->getFileSize();
-		recordJson["entityOffset"] = record->getFileOffset();
-
-		for (auto attr : record->getAttributes())
-		{
-			updatedAttributesJson[attr.first] = attr.second;
-		}
-		recordJson["updatedAttributes"] = updatedAttributesJson;
-
-		for (auto parentUUID : record->getParentUuids())
-		{
-			parentUuidJson.append(parentUUID.c_str());
-		}
-		recordJson["parentIds"] = parentUuidJson;
-
-		for (auto childUUID : record->getChildrenUuids())
-		{
-			childUuidJson.append(childUUID.c_str());
-		}
-		recordJson["childIds"] = childUuidJson;
-		recordJson["transitUri"] = record->getTransitUri().c_str();
-		recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str();
-		recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str();
-		recordJson["application"] = "MiNiFi Flow";
-		array.append(recordJson);
-	}
-
-	Json::StyledWriter writer;
-	std::string jsonStr = writer.write(array);
-	uint8_t *payload = (uint8_t *) jsonStr.c_str();
-	int length = jsonStr.length();
-
-	try
-	{
-		std::map<std::string, std::string> attributes;
-		protocol_->transferBytes(context, session, payload, length, attributes);
-	}
-	catch (...)
-	{
-		// if transfer bytes failed, return instead of purge the provenance records
-		returnProtocol(std::move(protocol_));
-		return;
-	}
-
-	// we transfer the record, purge the record from DB
-	repo->purgeProvenanceRecord(records);
-
-	returnProtocol(std::move(protocol_));
-
-	return;
+void ProvenanceTaskReport::onTrigger(core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+  int64_t lvalue;
+  std::string host = "";
+  uint16_t sport = 0;
+
+  if (context->getProperty(hostName.getName(), value)) {
+    host = value;
+  }
+  if (context->getProperty(port.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    sport = (uint16_t) lvalue;
+  }
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
+
+  std::shared_ptr<Site2SiteClientProtocol> protocol_ =
+      this->obtainSite2SiteProtocol(host, sport, protocol_uuid_);
+
+  if (!protocol_) {
+    context->yield();
+    return;
+  }
+
+  if (!protocol_->bootstrap()) {
+    // bootstrap the client protocol if needeed
+    context->yield();
+    std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor
+        > (context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
+        processor->getYieldPeriodMsec());
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  int64_t batch = 100;
+  if (context->getProperty(batchSize.getName(), value)
+      && core::Property::StringToInt(value, lvalue)) {
+    batch = lvalue;
+  }
+  std::vector < std::shared_ptr < ProvenanceEventRecord >> records;
+  std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast
+      < ProvenanceRepository > (context->getProvenanceRepository());
+  repo->getProvenanceRecord(records, batch);
+  if (records.size() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  std::string jsonStr;
+  this->getJasonReport(context, session, records, jsonStr);
+  if (jsonStr.length() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
+  }
+
+  try {
+    std::map < std::string, std::string > attributes;
+    protocol_->transferString(context, session, jsonStr, attributes);
+  } catch (...) {
+    // if transfer bytes failed, return instead of purge the provenance records
+    return;
+  }
+
+  // we transfer the record, purge the record from DB
+  repo->purgeProvenanceRecord(records);
+  returnSite2SiteProtocol(protocol_);
 }
 
 } /* namespace provenance */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 87f190c..1653e2d 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -41,16 +41,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
+
+  std::shared_ptr<core::Processor> processorReport = std::make_shared
+      < org::apache::nifi::minifi::provenance::ProvenanceTaskReport
+      > ("provenanceTaskReport");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -58,8 +62,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -77,7 +81,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
 
   processor->onSchedule(&context, &factory);
@@ -122,7 +126,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   for (auto entry : repo->getRepoMap()) {
     provenance::ProvenanceEventRecord newRecord;
     newRecord.DeSerialize((uint8_t*) entry.second.data(),
-                          entry.second.length());
+        entry.second.length());
 
     bool found = false;
     for (auto provRec : records) {
@@ -141,6 +145,26 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   }
 
+  core::ProcessorNode nodeReport(processorReport);
+  core::ProcessContext contextReport(nodeReport, test_repo);
+  core::ProcessSessionFactory factoryReport(&contextReport);
+  contextReport.setProperty(org::apache::nifi::minifi::provenance::ProvenanceTaskReport::batchSize,
+      "1");
+  core::ProcessSession sessionReport(&contextReport);
+  processorReport->onSchedule(&contextReport, &factoryReport);
+  std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceTaskReport> taskReport = std::static_pointer_cast
+        < org::apache::nifi::minifi::provenance::ProvenanceTaskReport > (processorReport);
+  std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport;
+  processorReport->incrementActiveTasks();
+  processorReport->setScheduledState(core::ScheduledState::RUNNING);
+  std::string jsonStr;
+  repo->getProvenanceRecord(recordsReport, 1);
+  taskReport->getJasonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
+  REQUIRE(recordsReport.size() == 1);
+  REQUIRE(taskReport->getName() == "provenanceTaskReport");
+  REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
+  REQUIRE(jsonStr.find("\"filename\" : \"tstFile.ext\"") != std::string::npos);
+
 }
 
 TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
@@ -149,16 +173,16 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -166,8 +190,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -184,7 +208,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   // replicate 10 threads
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onSchedule(&context, &factory);
@@ -229,9 +253,9 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
     session.commit();
     std::shared_ptr<core::FlowFile> ffr = session.get();
 
-    REQUIRE((repo->getRepoMap().size()%2) == 0);
-    REQUIRE(repo->getRepoMap().size() == (prev+2));
-    prev+=2;
+    REQUIRE((repo->getRepoMap().size() % 2) == 0);
+    REQUIRE(repo->getRepoMap().size() == (prev + 2));
+    prev += 2;
 
   }
 
@@ -239,10 +263,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
 
 TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
+  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr
+      < logging::BaseLogger
+      > (new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+          0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -252,11 +276,11 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
 
   std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared
+      < org::apache::nifi::minifi::processors::LogAttribute > ("logattribute");
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -267,12 +291,12 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   uuid_t logattribute_uuid;
   REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo, "logattribute");
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared
+      < minifi::Connection > (repo, "logattribute");
   connection2->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -298,7 +322,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   core::ProcessContext context(node, repo);
   core::ProcessContext context2(node2, repo);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -357,8 +381,8 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
       log_attribute_output.find("key:path value:" + std::string(dir))
           != std::string::npos);
 
-  outputLogger = std::unique_ptr<logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::NullAppender());
+  outputLogger = std::unique_ptr < logging::BaseLogger
+      > (new org::apache::nifi::minifi::core::logging::NullAppender());
   logger->updateLogger(std::move(outputLogger));
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 80d8642..67039bf 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -25,10 +25,10 @@
 /**
  * Test repository
  */
-class TestRepository : public core::Repository {
- public:
-  TestRepository()
-      : Repository("repo_name", "./dir", 1000, 100, 0) {
+class TestRepository: public core::Repository {
+public:
+  TestRepository() :
+      Repository("repo_name", "./dir", 1000, 100, 0) {
   }
   // initialize
   bool initialize() {
@@ -42,8 +42,8 @@ class TestRepository : public core::Repository {
 
   bool Put(std::string key, uint8_t *buf, int bufLen) {
     repositoryResults.insert(
-        std::pair<std::string, std::string>(
-            key, std::string((const char*) buf, bufLen)));
+        std::pair<std::string, std::string>(key,
+            std::string((const char*) buf, bufLen)));
     return true;
   }
   // Delete
@@ -66,19 +66,35 @@ class TestRepository : public core::Repository {
     return repositoryResults;
   }
 
+  void getProvenanceRecord(
+      std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+      int maxSize) {
+    for (auto entry : repositoryResults) {
+      if (records.size() >= maxSize)
+        break;
+      std::shared_ptr<provenance::ProvenanceEventRecord> eventRead =
+          std::make_shared<provenance::ProvenanceEventRecord>();
+
+      if (eventRead->DeSerialize((uint8_t*) entry.second.data(),
+          entry.second.length())) {
+        records.push_back(eventRead);
+      }
+    }
+  }
+
   void run() {
     // do nothing
   }
- protected:
+protected:
   std::map<std::string, std::string> repositoryResults;
 };
 
-class TestFlowController : public minifi::FlowController {
+class TestFlowController: public minifi::FlowController {
 
- public:
+public:
   TestFlowController(std::shared_ptr<core::Repository> repo,
-                     std::shared_ptr<core::Repository> flow_file_repo)
-      : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) {
+      std::shared_ptr<core::Repository> flow_file_repo) :
+      minifi::FlowController(repo, flow_file_repo, nullptr, "", true) {
   }
   ~TestFlowController() {
 
@@ -112,7 +128,7 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<core::Processor> createProcessor(std::string name,
-                                                   uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
 
@@ -125,10 +141,10 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<minifi::Connection> createConnection(std::string name,
-                                                       uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
- protected:
+protected:
   void initializePaths(const std::string &adjustedFilename) {
   }
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/__init__.py
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/__init__.py b/thirdparty/jsoncpp/devtools/__init__.py
deleted file mode 100644
index d18a521..0000000
--- a/thirdparty/jsoncpp/devtools/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# Copyright 2010 Baptiste Lepilleur
-# Distributed under MIT license, or public domain if desired and
-# recognized in your jurisdiction.
-# See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-# module

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/agent_vmw7.json
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/agent_vmw7.json b/thirdparty/jsoncpp/devtools/agent_vmw7.json
deleted file mode 100644
index 95d62ba..0000000
--- a/thirdparty/jsoncpp/devtools/agent_vmw7.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    "cmake_variants" : [
-        {"name": "generator",
-         "generators": [
-            {"generator": [
-                "Visual Studio 7 .NET 2003",
-                "Visual Studio 9 2008",
-                "Visual Studio 9 2008 Win64",
-                "Visual Studio 10",
-                "Visual Studio 10 Win64",
-                "Visual Studio 11",
-                "Visual Studio 11 Win64"
-                ]
-            },
-            {"generator": ["MinGW Makefiles"],
-             "env_prepend": [{"path": "c:/wut/prg/MinGW/bin"}]
-            }
-         ]
-        },
-        {"name": "shared_dll",
-         "variables": [
-            ["BUILD_SHARED_LIBS=true"],
-            ["BUILD_SHARED_LIBS=false"]
-          ]
-        },
-        {"name": "build_type",
-         "build_types": [
-            "debug",
-            "release"
-            ]
-        }
-    ]
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/agent_vmxp.json
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/agent_vmxp.json b/thirdparty/jsoncpp/devtools/agent_vmxp.json
deleted file mode 100644
index 39d5e53..0000000
--- a/thirdparty/jsoncpp/devtools/agent_vmxp.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-    "cmake_variants" : [
-        {"name": "generator",
-         "generators": [
-            {"generator": [
-                "Visual Studio 6",
-                "Visual Studio 7",
-                "Visual Studio 8 2005"
-                ]
-            }
-         ]
-        },
-        {"name": "shared_dll",
-         "variables": [
-            ["BUILD_SHARED_LIBS=true"],
-            ["BUILD_SHARED_LIBS=false"]
-          ]
-        },
-        {"name": "build_type",
-         "build_types": [
-            "debug",
-            "release"
-            ]
-        }
-    ]
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/antglob.py
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/antglob.py b/thirdparty/jsoncpp/devtools/antglob.py
deleted file mode 100644
index c272f66..0000000
--- a/thirdparty/jsoncpp/devtools/antglob.py
+++ /dev/null
@@ -1,205 +0,0 @@
-#!/usr/bin/env python
-# encoding: utf-8
-# Copyright 2009 Baptiste Lepilleur
-# Distributed under MIT license, or public domain if desired and
-# recognized in your jurisdiction.
-# See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-from __future__ import print_function
-from dircache import listdir
-import re
-import fnmatch
-import os.path
-
-
-# These fnmatch expressions are used by default to prune the directory tree
-# while doing the recursive traversal in the glob_impl method of glob function.
-prune_dirs = '.git .bzr .hg .svn _MTN _darcs CVS SCCS '
-
-# These fnmatch expressions are used by default to exclude files and dirs
-# while doing the recursive traversal in the glob_impl method of glob function.
-##exclude_pats = prune_pats + '*~ #*# .#* %*% ._* .gitignore .cvsignore vssver.scc .DS_Store'.split()
-
-# These ant_glob expressions are used by default to exclude files and dirs and also prune the directory tree
-# while doing the recursive traversal in the glob_impl method of glob function.
-default_excludes = '''
-**/*~
-**/#*#
-**/.#*
-**/%*%
-**/._*
-**/CVS
-**/CVS/**
-**/.cvsignore
-**/SCCS
-**/SCCS/**
-**/vssver.scc
-**/.svn
-**/.svn/**
-**/.git
-**/.git/**
-**/.gitignore
-**/.bzr
-**/.bzr/**
-**/.hg
-**/.hg/**
-**/_MTN
-**/_MTN/**
-**/_darcs
-**/_darcs/**
-**/.DS_Store '''
-
-DIR = 1
-FILE = 2
-DIR_LINK = 4
-FILE_LINK = 8
-LINKS = DIR_LINK | FILE_LINK
-ALL_NO_LINK = DIR | FILE
-ALL = DIR | FILE | LINKS
-
-_ANT_RE = re.compile(r'(/\*\*/)|(\*\*/)|(/\*\*)|(\*)|(/)|([^\*/]*)')
-
-def ant_pattern_to_re(ant_pattern):
-    """Generates a regular expression from the ant pattern.
-    Matching convention:
-    **/a: match 'a', 'dir/a', 'dir1/dir2/a'
-    a/**/b: match 'a/b', 'a/c/b', 'a/d/c/b'
-    *.py: match 'script.py' but not 'a/script.py'
-    """
-    rex = ['^']
-    next_pos = 0
-    sep_rex = r'(?:/|%s)' % re.escape(os.path.sep)
-##    print 'Converting', ant_pattern
-    for match in _ANT_RE.finditer(ant_pattern):
-##        print 'Matched', match.group()
-##        print match.start(0), next_pos
-        if match.start(0) != next_pos:
-            raise ValueError("Invalid ant pattern")
-        if match.group(1): # /**/
-            rex.append(sep_rex + '(?:.*%s)?' % sep_rex)
-        elif match.group(2): # **/
-            rex.append('(?:.*%s)?' % sep_rex)
-        elif match.group(3): # /**
-            rex.append(sep_rex + '.*')
-        elif match.group(4): # *
-            rex.append('[^/%s]*' % re.escape(os.path.sep))
-        elif match.group(5): # /
-            rex.append(sep_rex)
-        else: # somepath
-            rex.append(re.escape(match.group(6)))
-        next_pos = match.end()
-    rex.append('$')
-    return re.compile(''.join(rex))
-
-def _as_list(l):
-    if isinstance(l, basestring):
-        return l.split()
-    return l
-
-def glob(dir_path,
-         includes = '**/*',
-         excludes = default_excludes,
-         entry_type = FILE,
-         prune_dirs = prune_dirs,
-         max_depth = 25):
-    include_filter = [ant_pattern_to_re(p) for p in _as_list(includes)]
-    exclude_filter = [ant_pattern_to_re(p) for p in _as_list(excludes)]
-    prune_dirs = [p.replace('/',os.path.sep) for p in _as_list(prune_dirs)]
-    dir_path = dir_path.replace('/',os.path.sep)
-    entry_type_filter = entry_type
-
-    def is_pruned_dir(dir_name):
-        for pattern in prune_dirs:
-            if fnmatch.fnmatch(dir_name, pattern):
-                return True
-        return False
-
-    def apply_filter(full_path, filter_rexs):
-        """Return True if at least one of the filter regular expression match full_path."""
-        for rex in filter_rexs:
-            if rex.match(full_path):
-                return True
-        return False
-
-    def glob_impl(root_dir_path):
-        child_dirs = [root_dir_path]
-        while child_dirs:
-            dir_path = child_dirs.pop()
-            for entry in listdir(dir_path):
-                full_path = os.path.join(dir_path, entry)
-##                print 'Testing:', full_path,
-                is_dir = os.path.isdir(full_path)
-                if is_dir and not is_pruned_dir(entry): # explore child directory ?
-##                    print '===> marked for recursion',
-                    child_dirs.append(full_path)
-                included = apply_filter(full_path, include_filter)
-                rejected = apply_filter(full_path, exclude_filter)
-                if not included or rejected: # do not include entry ?
-##                    print '=> not included or rejected'
-                    continue
-                link = os.path.islink(full_path)
-                is_file = os.path.isfile(full_path)
-                if not is_file and not is_dir:
-##                    print '=> unknown entry type'
-                    continue
-                if link:
-                    entry_type = is_file and FILE_LINK or DIR_LINK
-                else:
-                    entry_type = is_file and FILE or DIR
-##                print '=> type: %d' % entry_type, 
-                if (entry_type & entry_type_filter) != 0:
-##                    print ' => KEEP'
-                    yield os.path.join(dir_path, entry)
-##                else:
-##                    print ' => TYPE REJECTED'
-    return list(glob_impl(dir_path))
-
-
-if __name__ == "__main__":
-    import unittest
-
-    class AntPatternToRETest(unittest.TestCase):
-##        def test_conversion(self):
-##            self.assertEqual('^somepath$', ant_pattern_to_re('somepath').pattern)
-
-        def test_matching(self):
-            test_cases = [ ('path',
-                             ['path'],
-                             ['somepath', 'pathsuffix', '/path', '/path']),
-                           ('*.py',
-                             ['source.py', 'source.ext.py', '.py'],
-                             ['path/source.py', '/.py', 'dir.py/z', 'z.pyc', 'z.c']),
-                           ('**/path',
-                             ['path', '/path', '/a/path', 'c:/a/path', '/a/b/path', '//a/path', '/a/path/b/path'],
-                             ['path/', 'a/path/b', 'dir.py/z', 'somepath', 'pathsuffix', 'a/somepath']),
-                           ('path/**',
-                             ['path/a', 'path/path/a', 'path//'],
-                             ['path', 'somepath/a', 'a/path', 'a/path/a', 'pathsuffix/a']),
-                           ('/**/path',
-                             ['/path', '/a/path', '/a/b/path/path', '/path/path'],
-                             ['path', 'path/', 'a/path', '/pathsuffix', '/somepath']),
-                           ('a/b',
-                             ['a/b'],
-                             ['somea/b', 'a/bsuffix', 'a/b/c']),
-                           ('**/*.py',
-                             ['script.py', 'src/script.py', 'a/b/script.py', '/a/b/script.py'],
-                             ['script.pyc', 'script.pyo', 'a.py/b']),
-                           ('src/**/*.py',
-                             ['src/a.py', 'src/dir/a.py'],
-                             ['a/src/a.py', '/src/a.py']),
-                           ]
-            for ant_pattern, accepted_matches, rejected_matches in list(test_cases):
-                def local_path(paths):
-                    return [ p.replace('/',os.path.sep) for p in paths ]
-                test_cases.append((ant_pattern, local_path(accepted_matches), local_path(rejected_matches)))
-            for ant_pattern, accepted_matches, rejected_matches in test_cases:
-                rex = ant_pattern_to_re(ant_pattern)
-                print('ant_pattern:', ant_pattern, ' => ', rex.pattern)
-                for accepted_match in accepted_matches:
-                    print('Accepted?:', accepted_match)
-                    self.assertTrue(rex.match(accepted_match) is not None)
-                for rejected_match in rejected_matches:
-                    print('Rejected?:', rejected_match)
-                    self.assertTrue(rex.match(rejected_match) is None)
-
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/batchbuild.py
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/batchbuild.py b/thirdparty/jsoncpp/devtools/batchbuild.py
deleted file mode 100644
index 0eb0690..0000000
--- a/thirdparty/jsoncpp/devtools/batchbuild.py
+++ /dev/null
@@ -1,278 +0,0 @@
-from __future__ import print_function
-import collections
-import itertools
-import json
-import os
-import os.path
-import re
-import shutil
-import string
-import subprocess
-import sys
-import cgi
-
-class BuildDesc:
-    def __init__(self, prepend_envs=None, variables=None, build_type=None, generator=None):
-        self.prepend_envs = prepend_envs or [] # [ { "var": "value" } ]
-        self.variables = variables or []
-        self.build_type = build_type
-        self.generator = generator
-
-    def merged_with(self, build_desc):
-        """Returns a new BuildDesc by merging field content.
-           Prefer build_desc fields to self fields for single valued field.
-        """
-        return BuildDesc(self.prepend_envs + build_desc.prepend_envs,
-                          self.variables + build_desc.variables,
-                          build_desc.build_type or self.build_type,
-                          build_desc.generator or self.generator)
-
-    def env(self):
-        environ = os.environ.copy()
-        for values_by_name in self.prepend_envs:
-            for var, value in list(values_by_name.items()):
-                var = var.upper()
-                if type(value) is unicode:
-                    value = value.encode(sys.getdefaultencoding())
-                if var in environ:
-                    environ[var] = value + os.pathsep + environ[var]
-                else:
-                    environ[var] = value
-        return environ
-
-    def cmake_args(self):
-        args = ["-D%s" % var for var in self.variables]
-        # skip build type for Visual Studio solution as it cause warning
-        if self.build_type and 'Visual' not in self.generator:
-            args.append("-DCMAKE_BUILD_TYPE=%s" % self.build_type)
-        if self.generator:
-            args.extend(['-G', self.generator])
-        return args
-
-    def __repr__(self):
-        return "BuildDesc(%s, build_type=%s)" %  (" ".join(self.cmake_args()), self.build_type)
-
-class BuildData:
-    def __init__(self, desc, work_dir, source_dir):
-        self.desc = desc
-        self.work_dir = work_dir
-        self.source_dir = source_dir
-        self.cmake_log_path = os.path.join(work_dir, 'batchbuild_cmake.log')
-        self.build_log_path = os.path.join(work_dir, 'batchbuild_build.log')
-        self.cmake_succeeded = False
-        self.build_succeeded = False
-
-    def execute_build(self):
-        print('Build %s' % self.desc)
-        self._make_new_work_dir()
-        self.cmake_succeeded = self._generate_makefiles()
-        if self.cmake_succeeded:
-            self.build_succeeded = self._build_using_makefiles()
-        return self.build_succeeded
-
-    def _generate_makefiles(self):
-        print('  Generating makefiles: ', end=' ')
-        cmd = ['cmake'] + self.desc.cmake_args() + [os.path.abspath(self.source_dir)]
-        succeeded = self._execute_build_subprocess(cmd, self.desc.env(), self.cmake_log_path)
-        print('done' if succeeded else 'FAILED')
-        return succeeded
-
-    def _build_using_makefiles(self):
-        print('  Building:', end=' ')
-        cmd = ['cmake', '--build', self.work_dir]
-        if self.desc.build_type:
-            cmd += ['--config', self.desc.build_type]
-        succeeded = self._execute_build_subprocess(cmd, self.desc.env(), self.build_log_path)
-        print('done' if succeeded else 'FAILED')
-        return succeeded
-
-    def _execute_build_subprocess(self, cmd, env, log_path):
-        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.work_dir,
-                                    env=env)
-        stdout, _ = process.communicate()
-        succeeded = (process.returncode == 0)
-        with open(log_path, 'wb') as flog:
-            log = ' '.join(cmd) + '\n' + stdout + '\nExit code: %r\n' % process.returncode
-            flog.write(fix_eol(log))
-        return succeeded
-
-    def _make_new_work_dir(self):
-        if os.path.isdir(self.work_dir):
-            print('  Removing work directory', self.work_dir)
-            shutil.rmtree(self.work_dir, ignore_errors=True)
-        if not os.path.isdir(self.work_dir):
-            os.makedirs(self.work_dir)
-
-def fix_eol(stdout):
-    """Fixes wrong EOL produced by cmake --build on Windows (\r\r\n instead of \r\n).
-    """
-    return re.sub('\r*\n', os.linesep, stdout)
-
-def load_build_variants_from_config(config_path):
-    with open(config_path, 'rb') as fconfig:
-        data = json.load(fconfig)
-    variants = data[ 'cmake_variants' ]
-    build_descs_by_axis = collections.defaultdict(list)
-    for axis in variants:
-        axis_name = axis["name"]
-        build_descs = []
-        if "generators" in axis:
-            for generator_data in axis["generators"]:
-                for generator in generator_data["generator"]:
-                    build_desc = BuildDesc(generator=generator,
-                                            prepend_envs=generator_data.get("env_prepend"))
-                    build_descs.append(build_desc)
-        elif "variables" in axis:
-            for variables in axis["variables"]:
-                build_desc = BuildDesc(variables=variables)
-                build_descs.append(build_desc)
-        elif "build_types" in axis:
-            for build_type in axis["build_types"]:
-                build_desc = BuildDesc(build_type=build_type)
-                build_descs.append(build_desc)
-        build_descs_by_axis[axis_name].extend(build_descs)
-    return build_descs_by_axis
-
-def generate_build_variants(build_descs_by_axis):
-    """Returns a list of BuildDesc generated for the partial BuildDesc for each axis."""
-    axis_names = list(build_descs_by_axis.keys())
-    build_descs = []
-    for axis_name, axis_build_descs in list(build_descs_by_axis.items()):
-        if len(build_descs):
-            # for each existing build_desc and each axis build desc, create a new build_desc
-            new_build_descs = []
-            for prototype_build_desc, axis_build_desc in itertools.product(build_descs, axis_build_descs):
-                new_build_descs.append(prototype_build_desc.merged_with(axis_build_desc))
-            build_descs = new_build_descs
-        else:
-            build_descs = axis_build_descs
-    return build_descs
-
-HTML_TEMPLATE = string.Template('''<html>
-<head>
-    <title>$title</title>
-    <style type="text/css">
-    td.failed {background-color:#f08080;}
-    td.ok {background-color:#c0eec0;}
-    </style>
-</head>
-<body>
-<table border="1">
-<thead>
-    <tr>
-        <th>Variables</th>
-        $th_vars
-    </tr>
-    <tr>
-        <th>Build type</th>
-        $th_build_types
-    </tr>
-</thead>
-<tbody>
-$tr_builds
-</tbody>
-</table>
-</body></html>''')
-
-def generate_html_report(html_report_path, builds):
-    report_dir = os.path.dirname(html_report_path)
-    # Vertical axis: generator
-    # Horizontal: variables, then build_type
-    builds_by_generator = collections.defaultdict(list)
-    variables = set()
-    build_types_by_variable = collections.defaultdict(set)
-    build_by_pos_key = {} # { (generator, var_key, build_type): build }
-    for build in builds:
-        builds_by_generator[build.desc.generator].append(build)
-        var_key = tuple(sorted(build.desc.variables))
-        variables.add(var_key)
-        build_types_by_variable[var_key].add(build.desc.build_type)
-        pos_key = (build.desc.generator, var_key, build.desc.build_type)
-        build_by_pos_key[pos_key] = build
-    variables = sorted(variables)
-    th_vars = []
-    th_build_types = []
-    for variable in variables:
-        build_types = sorted(build_types_by_variable[variable])
-        nb_build_type = len(build_types_by_variable[variable])
-        th_vars.append('<th colspan="%d">%s</th>' % (nb_build_type, cgi.escape(' '.join(variable))))
-        for build_type in build_types:
-            th_build_types.append('<th>%s</th>' % cgi.escape(build_type))
-    tr_builds = []
-    for generator in sorted(builds_by_generator):
-        tds = [ '<td>%s</td>\n' % cgi.escape(generator) ]
-        for variable in variables:
-            build_types = sorted(build_types_by_variable[variable])
-            for build_type in build_types:
-                pos_key = (generator, variable, build_type)
-                build = build_by_pos_key.get(pos_key)
-                if build:
-                    cmake_status = 'ok' if build.cmake_succeeded else 'FAILED'
-                    build_status = 'ok' if build.build_succeeded else 'FAILED'
-                    cmake_log_url = os.path.relpath(build.cmake_log_path, report_dir)
-                    build_log_url = os.path.relpath(build.build_log_path, report_dir)
-                    td = '<td class="%s"><a href="%s" class="%s">CMake: %s</a>' % (                        build_status.lower(), cmake_log_url, cmake_status.lower(), cmake_status)
-                    if build.cmake_succeeded:
-                        td += '<br><a href="%s" class="%s">Build: %s</a>' % (                            build_log_url, build_status.lower(), build_status)
-                    td += '</td>'
-                else:
-                    td = '<td></td>'
-                tds.append(td)
-        tr_builds.append('<tr>%s</tr>' % '\n'.join(tds))
-    html = HTML_TEMPLATE.substitute(        title='Batch build report',
-        th_vars=' '.join(th_vars),
-        th_build_types=' '.join(th_build_types),
-        tr_builds='\n'.join(tr_builds))
-    with open(html_report_path, 'wt') as fhtml:
-        fhtml.write(html)
-    print('HTML report generated in:', html_report_path)
-
-def main():
-    usage = r"""%prog WORK_DIR SOURCE_DIR CONFIG_JSON_PATH [CONFIG2_JSON_PATH...]
-Build a given CMake based project located in SOURCE_DIR with multiple generators/options.dry_run
-as described in CONFIG_JSON_PATH building in WORK_DIR.
-
-Example of call:
-python devtools\batchbuild.py e:\buildbots\jsoncpp\build . devtools\agent_vmw7.json
-"""
-    from optparse import OptionParser
-    parser = OptionParser(usage=usage)
-    parser.allow_interspersed_args = True
-#    parser.add_option('-v', '--verbose', dest="verbose", action='store_true',
-#        help="""Be verbose.""")
-    parser.enable_interspersed_args()
-    options, args = parser.parse_args()
-    if len(args) < 3:
-        parser.error("Missing one of WORK_DIR SOURCE_DIR CONFIG_JSON_PATH.")
-    work_dir = args[0]
-    source_dir = args[1].rstrip('/\\')
-    config_paths = args[2:]
-    for config_path in config_paths:
-        if not os.path.isfile(config_path):
-            parser.error("Can not read: %r" % config_path)
-
-    # generate build variants
-    build_descs = []
-    for config_path in config_paths:
-        build_descs_by_axis = load_build_variants_from_config(config_path)
-        build_descs.extend(generate_build_variants(build_descs_by_axis))
-    print('Build variants (%d):' % len(build_descs))
-    # assign build directory for each variant
-    if not os.path.isdir(work_dir):
-        os.makedirs(work_dir)
-    builds = []
-    with open(os.path.join(work_dir, 'matrix-dir-map.txt'), 'wt') as fmatrixmap:
-        for index, build_desc in enumerate(build_descs):
-            build_desc_work_dir = os.path.join(work_dir, '%03d' % (index+1))
-            builds.append(BuildData(build_desc, build_desc_work_dir, source_dir))
-            fmatrixmap.write('%s: %s\n' % (build_desc_work_dir, build_desc))
-    for build in builds:
-        build.execute_build()
-    html_report_path = os.path.join(work_dir, 'batchbuild-report.html')
-    generate_html_report(html_report_path, builds)
-    print('Done')
-
-
-if __name__ == '__main__':
-    main()
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d2d8ae9/thirdparty/jsoncpp/devtools/fixeol.py
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/devtools/fixeol.py b/thirdparty/jsoncpp/devtools/fixeol.py
deleted file mode 100644
index b55e146..0000000
--- a/thirdparty/jsoncpp/devtools/fixeol.py
+++ /dev/null
@@ -1,70 +0,0 @@
-# Copyright 2010 Baptiste Lepilleur
-# Distributed under MIT license, or public domain if desired and
-# recognized in your jurisdiction.
-# See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-from __future__ import print_function
-import os.path
-import sys
-
-def fix_source_eol(path, is_dry_run = True, verbose = True, eol = '\n'):
-    """Makes sure that all sources have the specified eol sequence (default: unix)."""
-    if not os.path.isfile(path):
-        raise ValueError('Path "%s" is not a file' % path)
-    try:
-        f = open(path, 'rb')
-    except IOError as msg:
-        print("%s: I/O Error: %s" % (file, str(msg)), file=sys.stderr)
-        return False
-    try:
-        raw_lines = f.readlines()
-    finally:
-        f.close()
-    fixed_lines = [line.rstrip('\r\n') + eol for line in raw_lines]
-    if raw_lines != fixed_lines:
-        print('%s =>' % path, end=' ')
-        if not is_dry_run:
-            f = open(path, "wb")
-            try:
-                f.writelines(fixed_lines)
-            finally:
-                f.close()
-        if verbose:
-            print(is_dry_run and ' NEED FIX' or ' FIXED')
-    return True
-##    
-##    
-##
-##def _do_fix(is_dry_run = True):
-##    from waftools import antglob
-##    python_sources = antglob.glob('.',
-##        includes = '**/*.py **/wscript **/wscript_build',
-##        excludes = antglob.default_excludes + './waf.py',
-##        prune_dirs = antglob.prune_dirs + 'waf-* ./build')
-##    for path in python_sources:
-##        _fix_python_source(path, is_dry_run)
-##
-##    cpp_sources = antglob.glob('.',
-##        includes = '**/*.cpp **/*.h **/*.inl',
-##        prune_dirs = antglob.prune_dirs + 'waf-* ./build')
-##    for path in cpp_sources:
-##        _fix_source_eol(path, is_dry_run)
-##
-##
-##def dry_fix(context):
-##    _do_fix(is_dry_run = True)
-##
-##def fix(context):
-##    _do_fix(is_dry_run = False)
-##
-##def shutdown():
-##    pass
-##
-##def check(context):
-##    # Unit tests are run when "check" target is used
-##    ut = UnitTest.unit_test()
-##    ut.change_to_testfile_dir = True
-##    ut.want_to_see_test_output = True
-##    ut.want_to_see_test_error = True
-##    ut.run()
-##    ut.print_results()