You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/04/11 17:16:22 UTC
[14/20] nifi-minifi-cpp git commit: MINIFI-227: Provenance report
MINIFI-227: Provenance report
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/126c2ed5
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/126c2ed5
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/126c2ed5
Branch: refs/heads/MINIFI-227
Commit: 126c2ed50ac4e7b041cecc7be7fcfa301eab83a3
Parents: 4a4602a
Author: Bin Qiu <be...@gmail.com>
Authored: Fri Mar 31 16:04:56 2017 -0700
Committer: Bin Qiu <be...@gmail.com>
Committed: Tue Apr 11 09:57:47 2017 -0700
----------------------------------------------------------------------
CMakeLists.txt | 4 +-
libminifi/CMakeLists.txt | 1 +
libminifi/include/Site2SiteClientProtocol.h | 13 +-
libminifi/include/core/FlowConfiguration.h | 1 +
libminifi/include/provenance/Provenance.h | 1 +
.../include/provenance/ProvenanceRepository.h | 28 +
.../include/provenance/ProvenanceTaskReport.h | 88 +
libminifi/src/Site2SiteClientProtocol.cpp | 406 +--
libminifi/src/core/FlowConfiguration.cpp | 4 +
libminifi/src/provenance/Provenance.cpp | 5 +
.../src/provenance/ProvenanceRepository.cpp | 1 +
.../src/provenance/ProvenanceTaskReport.cpp | 221 ++
main/CMakeLists.txt | 6 +-
thirdparty/jsoncpp/AUTHORS | 1 +
thirdparty/jsoncpp/CMakeLists.txt | 156 ++
thirdparty/jsoncpp/LICENSE | 55 +
thirdparty/jsoncpp/NEWS.txt | 175 ++
thirdparty/jsoncpp/README.md | 225 ++
thirdparty/jsoncpp/SConstruct | 248 ++
thirdparty/jsoncpp/amalgamate.py | 155 ++
thirdparty/jsoncpp/appveyor.yml | 35 +
thirdparty/jsoncpp/dev.makefile | 35 +
thirdparty/jsoncpp/devtools/__init__.py | 6 +
thirdparty/jsoncpp/devtools/agent_vmw7.json | 33 +
thirdparty/jsoncpp/devtools/agent_vmxp.json | 26 +
thirdparty/jsoncpp/devtools/antglob.py | 205 ++
thirdparty/jsoncpp/devtools/batchbuild.py | 278 ++
thirdparty/jsoncpp/devtools/fixeol.py | 70 +
thirdparty/jsoncpp/devtools/licenseupdater.py | 94 +
thirdparty/jsoncpp/devtools/tarball.py | 52 +
thirdparty/jsoncpp/doc/doxyfile.in | 2301 ++++++++++++++++
thirdparty/jsoncpp/doc/footer.html | 3 +
thirdparty/jsoncpp/doc/header.html | 24 +
thirdparty/jsoncpp/doc/jsoncpp.dox | 164 ++
thirdparty/jsoncpp/doc/readme.txt | 1 +
thirdparty/jsoncpp/doc/roadmap.dox | 3 +
thirdparty/jsoncpp/doc/web_doxyfile.in | 2301 ++++++++++++++++
thirdparty/jsoncpp/doxybuild.py | 189 ++
thirdparty/jsoncpp/include/CMakeLists.txt | 2 +
thirdparty/jsoncpp/include/json/allocator.h | 98 +
thirdparty/jsoncpp/include/json/assertions.h | 54 +
thirdparty/jsoncpp/include/json/autolink.h | 25 +
thirdparty/jsoncpp/include/json/config.h | 184 ++
thirdparty/jsoncpp/include/json/features.h | 61 +
thirdparty/jsoncpp/include/json/forwards.h | 37 +
thirdparty/jsoncpp/include/json/json.h | 15 +
thirdparty/jsoncpp/include/json/reader.h | 408 +++
thirdparty/jsoncpp/include/json/value.h | 870 ++++++
thirdparty/jsoncpp/include/json/version.h | 20 +
thirdparty/jsoncpp/include/json/writer.h | 335 +++
.../jsoncpp/makefiles/msvc2010/jsoncpp.sln | 42 +
.../jsoncpp/makefiles/msvc2010/jsontest.vcxproj | 96 +
.../makefiles/msvc2010/jsontest.vcxproj.filters | 13 +
.../jsoncpp/makefiles/msvc2010/lib_json.vcxproj | 143 +
.../makefiles/msvc2010/lib_json.vcxproj.filters | 33 +
.../makefiles/msvc2010/test_lib_json.vcxproj | 109 +
.../msvc2010/test_lib_json.vcxproj.filters | 24 +
thirdparty/jsoncpp/makefiles/vs71/jsoncpp.sln | 46 +
.../jsoncpp/makefiles/vs71/jsontest.vcproj | 119 +
.../jsoncpp/makefiles/vs71/lib_json.vcproj | 205 ++
.../jsoncpp/makefiles/vs71/test_lib_json.vcproj | 130 +
thirdparty/jsoncpp/makerelease.py | 390 +++
thirdparty/jsoncpp/pkg-config/jsoncpp.pc.in | 9 +
thirdparty/jsoncpp/scons-tools/globtool.py | 58 +
thirdparty/jsoncpp/scons-tools/srcdist.py | 183 ++
thirdparty/jsoncpp/scons-tools/substinfile.py | 85 +
thirdparty/jsoncpp/scons-tools/targz.py | 87 +
thirdparty/jsoncpp/src/CMakeLists.txt | 5 +
.../jsoncpp/src/jsontestrunner/CMakeLists.txt | 25 +
thirdparty/jsoncpp/src/jsontestrunner/main.cpp | 326 +++
.../jsoncpp/src/jsontestrunner/sconscript | 9 +
thirdparty/jsoncpp/src/lib_json/CMakeLists.txt | 113 +
thirdparty/jsoncpp/src/lib_json/json_reader.cpp | 2036 ++++++++++++++
thirdparty/jsoncpp/src/lib_json/json_tool.h | 117 +
thirdparty/jsoncpp/src/lib_json/json_value.cpp | 1617 +++++++++++
.../jsoncpp/src/lib_json/json_valueiterator.inl | 167 ++
thirdparty/jsoncpp/src/lib_json/json_writer.cpp | 1224 +++++++++
thirdparty/jsoncpp/src/lib_json/sconscript | 8 +
thirdparty/jsoncpp/src/lib_json/version.h.in | 20 +
.../jsoncpp/src/test_lib_json/CMakeLists.txt | 38 +
.../jsoncpp/src/test_lib_json/jsontest.cpp | 457 ++++
thirdparty/jsoncpp/src/test_lib_json/jsontest.h | 286 ++
thirdparty/jsoncpp/src/test_lib_json/main.cpp | 2589 ++++++++++++++++++
thirdparty/jsoncpp/src/test_lib_json/sconscript | 10 +
thirdparty/jsoncpp/test/cleantests.py | 16 +
.../jsoncpp/test/data/fail_test_array_01.json | 1 +
.../test/data/fail_test_stack_limit.json | 1 +
.../jsoncpp/test/data/test_array_01.expected | 1 +
thirdparty/jsoncpp/test/data/test_array_01.json | 1 +
.../jsoncpp/test/data/test_array_02.expected | 2 +
thirdparty/jsoncpp/test/data/test_array_02.json | 1 +
.../jsoncpp/test/data/test_array_03.expected | 6 +
thirdparty/jsoncpp/test/data/test_array_03.json | 1 +
.../jsoncpp/test/data/test_array_04.expected | 5 +
thirdparty/jsoncpp/test/data/test_array_04.json | 1 +
.../jsoncpp/test/data/test_array_05.expected | 100 +
thirdparty/jsoncpp/test/data/test_array_05.json | 1 +
.../jsoncpp/test/data/test_array_06.expected | 5 +
thirdparty/jsoncpp/test/data/test_array_06.json | 4 +
.../jsoncpp/test/data/test_array_07.expected | 2122 ++++++++++++++
thirdparty/jsoncpp/test/data/test_array_07.json | 2 +
.../jsoncpp/test/data/test_basic_01.expected | 1 +
thirdparty/jsoncpp/test/data/test_basic_01.json | 1 +
.../jsoncpp/test/data/test_basic_02.expected | 1 +
thirdparty/jsoncpp/test/data/test_basic_02.json | 1 +
.../jsoncpp/test/data/test_basic_03.expected | 3 +
thirdparty/jsoncpp/test/data/test_basic_03.json | 3 +
.../jsoncpp/test/data/test_basic_04.expected | 2 +
thirdparty/jsoncpp/test/data/test_basic_04.json | 2 +
.../jsoncpp/test/data/test_basic_05.expected | 2 +
thirdparty/jsoncpp/test/data/test_basic_05.json | 2 +
.../jsoncpp/test/data/test_basic_06.expected | 2 +
thirdparty/jsoncpp/test/data/test_basic_06.json | 2 +
.../jsoncpp/test/data/test_basic_07.expected | 2 +
thirdparty/jsoncpp/test/data/test_basic_07.json | 2 +
.../jsoncpp/test/data/test_basic_08.expected | 3 +
thirdparty/jsoncpp/test/data/test_basic_08.json | 3 +
.../jsoncpp/test/data/test_basic_09.expected | 4 +
thirdparty/jsoncpp/test/data/test_basic_09.json | 4 +
.../jsoncpp/test/data/test_comment_00.expected | 4 +
.../jsoncpp/test/data/test_comment_00.json | 5 +
.../jsoncpp/test/data/test_comment_01.expected | 10 +
.../jsoncpp/test/data/test_comment_01.json | 10 +
.../jsoncpp/test/data/test_comment_02.expected | 23 +
.../jsoncpp/test/data/test_comment_02.json | 26 +
.../jsoncpp/test/data/test_complex_01.expected | 20 +
.../jsoncpp/test/data/test_complex_01.json | 17 +
.../jsoncpp/test/data/test_integer_01.expected | 2 +
.../jsoncpp/test/data/test_integer_01.json | 2 +
.../jsoncpp/test/data/test_integer_02.expected | 2 +
.../jsoncpp/test/data/test_integer_02.json | 2 +
.../jsoncpp/test/data/test_integer_03.expected | 2 +
.../jsoncpp/test/data/test_integer_03.json | 2 +
.../jsoncpp/test/data/test_integer_04.expected | 3 +
.../jsoncpp/test/data/test_integer_04.json | 3 +
.../jsoncpp/test/data/test_integer_05.expected | 2 +
.../jsoncpp/test/data/test_integer_05.json | 2 +
.../test/data/test_integer_06_64bits.expected | 1 +
.../test/data/test_integer_06_64bits.json | 2 +
.../test/data/test_integer_07_64bits.expected | 1 +
.../test/data/test_integer_07_64bits.json | 2 +
.../test/data/test_integer_08_64bits.expected | 1 +
.../test/data/test_integer_08_64bits.json | 2 +
.../jsoncpp/test/data/test_large_01.expected | 2122 ++++++++++++++
thirdparty/jsoncpp/test/data/test_large_01.json | 2 +
.../jsoncpp/test/data/test_object_01.expected | 1 +
.../jsoncpp/test/data/test_object_01.json | 1 +
.../jsoncpp/test/data/test_object_02.expected | 2 +
.../jsoncpp/test/data/test_object_02.json | 1 +
.../jsoncpp/test/data/test_object_03.expected | 4 +
.../jsoncpp/test/data/test_object_03.json | 5 +
.../jsoncpp/test/data/test_object_04.expected | 2 +
.../jsoncpp/test/data/test_object_04.json | 3 +
.../test/data/test_preserve_comment_01.expected | 11 +
.../test/data/test_preserve_comment_01.json | 14 +
.../jsoncpp/test/data/test_real_01.expected | 3 +
thirdparty/jsoncpp/test/data/test_real_01.json | 3 +
.../jsoncpp/test/data/test_real_02.expected | 3 +
thirdparty/jsoncpp/test/data/test_real_02.json | 3 +
.../jsoncpp/test/data/test_real_03.expected | 3 +
thirdparty/jsoncpp/test/data/test_real_03.json | 3 +
.../jsoncpp/test/data/test_real_04.expected | 3 +
thirdparty/jsoncpp/test/data/test_real_04.json | 3 +
.../jsoncpp/test/data/test_real_05.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_05.json | 3 +
.../jsoncpp/test/data/test_real_06.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_06.json | 3 +
.../jsoncpp/test/data/test_real_07.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_07.json | 3 +
.../jsoncpp/test/data/test_real_08.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_08.json | 4 +
.../jsoncpp/test/data/test_real_09.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_09.json | 4 +
.../jsoncpp/test/data/test_real_10.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_10.json | 4 +
.../jsoncpp/test/data/test_real_11.expected | 4 +
thirdparty/jsoncpp/test/data/test_real_11.json | 4 +
.../jsoncpp/test/data/test_real_12.expected | 2 +
thirdparty/jsoncpp/test/data/test_real_12.json | 2 +
.../jsoncpp/test/data/test_string_01.expected | 1 +
.../jsoncpp/test/data/test_string_01.json | 1 +
.../jsoncpp/test/data/test_string_02.expected | 1 +
.../jsoncpp/test/data/test_string_02.json | 1 +
.../jsoncpp/test/data/test_string_03.expected | 1 +
.../jsoncpp/test/data/test_string_03.json | 1 +
.../jsoncpp/test/data/test_string_04.expected | 2 +
.../jsoncpp/test/data/test_string_04.json | 2 +
.../jsoncpp/test/data/test_string_05.expected | 2 +
.../jsoncpp/test/data/test_string_05.json | 2 +
.../test/data/test_string_unicode_01.expected | 1 +
.../test/data/test_string_unicode_01.json | 1 +
.../test/data/test_string_unicode_02.expected | 1 +
.../test/data/test_string_unicode_02.json | 1 +
.../test/data/test_string_unicode_03.expected | 1 +
.../test/data/test_string_unicode_03.json | 1 +
.../test/data/test_string_unicode_04.expected | 1 +
.../test/data/test_string_unicode_04.json | 1 +
.../test/data/test_string_unicode_05.expected | 2 +
.../test/data/test_string_unicode_05.json | 1 +
thirdparty/jsoncpp/test/generate_expected.py | 17 +
thirdparty/jsoncpp/test/jsonchecker/fail1.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail10.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail11.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail12.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail13.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail14.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail15.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail16.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail17.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail18.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail19.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail2.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail20.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail21.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail22.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail23.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail24.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail25.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail26.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail27.json | 2 +
thirdparty/jsoncpp/test/jsonchecker/fail28.json | 2 +
thirdparty/jsoncpp/test/jsonchecker/fail29.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail3.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail30.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail31.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail32.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail33.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail4.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail5.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail6.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail7.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail8.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/fail9.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/pass1.json | 58 +
thirdparty/jsoncpp/test/jsonchecker/pass2.json | 1 +
thirdparty/jsoncpp/test/jsonchecker/pass3.json | 6 +
thirdparty/jsoncpp/test/jsonchecker/readme.txt | 3 +
thirdparty/jsoncpp/test/pyjsontestrunner.py | 71 +
thirdparty/jsoncpp/test/runjsontests.py | 174 ++
thirdparty/jsoncpp/test/rununittests.py | 84 +
thirdparty/jsoncpp/travis.sh | 31 +
thirdparty/jsoncpp/version | 1 +
thirdparty/jsoncpp/version.in | 1 +
243 files changed, 25589 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b84706d..2ae7332 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -81,6 +81,7 @@ find_package(UUID REQUIRED)
file(GLOB SPD_SOURCES "include/spdlog/*")
add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
+add_subdirectory(thirdparty/jsoncpp)
set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
add_subdirectory(thirdparty/civetweb-1.9.1)
add_subdirectory(libminifi)
@@ -125,6 +126,7 @@ enable_testing(test)
add_executable(tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES})
target_include_directories(tests PRIVATE BEFORE "thirdparty/catch")
target_include_directories(tests PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+ target_include_directories(tests PRIVATE BEFORE "thirdparty/jsoncpp/include")
target_include_directories(tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
target_include_directories(tests PRIVATE BEFORE "include")
target_include_directories(tests PRIVATE BEFORE "libminifi/include/")
@@ -134,7 +136,7 @@ enable_testing(test)
target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils")
target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors")
target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance")
- target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp)
+ target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static)
add_test(NAME LibMinifiTests COMMAND tests)
file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 4c71cc1..5419667 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -58,6 +58,7 @@ endif()
include_directories(../include)
include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
include_directories(../thirdparty/civetweb-1.9.1/include)
+include_directories(../thirdparty/jsoncpp/include)
include_directories(include)
file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h
index 78673d8..67fd444 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -376,7 +376,8 @@ class Transaction {
class DataPacket {
public:
DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
- std::map<std::string, std::string> attributes) {
+ std::map<std::string, std::string> attributes, std::string &payload) :
+ payload_ (payload) {
_protocol = protocol;
_size = 0;
_transaction = transaction;
@@ -386,6 +387,8 @@ class DataPacket {
uint64_t _size;
Site2SiteClientProtocol *_protocol;
Transaction *_transaction;
+ std::string & payload_;
+
};
// Site2SiteClientProtocol Class
@@ -532,8 +535,12 @@ class Site2SiteClientProtocol {
void receiveFlowFiles(core::ProcessContext *context,
core::ProcessSession *session);
// Transfer flow files for the process session
- void transferFlowFiles(core::ProcessContext *context,
- core::ProcessSession *session);
+ void transferFlowFiles(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ //! Transfer string for the process session
+ void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload,
+ std::map<std::string, std::string> attributes);
// deleteTransaction
void deleteTransaction(std::string transactionID);
// Nest Callback Class for write stream
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index de8ceb4..35aa6d9 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -22,6 +22,7 @@
#include "Connection.h"
#include "RemoteProcessorGroupPort.h"
#include "provenance/Provenance.h"
+#include "provenance/ProvenanceTaskReport.h"
#include "processors/GetFile.h"
#include "processors/PutFile.h"
#include "processors/TailFile.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 82754c4..2977f28 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -155,6 +155,7 @@ class ProvenanceEventRecord :
*/
REPLAY
};
+ static const char *ProvenanceEventTypeStr[REPLAY+1];
public:
// Constructor
/*!
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
index 2b71fd9..7dd1757 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -149,6 +149,34 @@ class ProvenanceRepository : public core::Repository,
void removeEvent(ProvenanceEventRecord *event) {
Delete(event->getEventId());
}
+ //! get record
+ void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize)
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ leveldb::Iterator* it = db_->NewIterator(
+ leveldb::ReadOptions());
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::shared_ptr<ProvenanceEventRecord> eventRead (new ProvenanceEventRecord());
+ std::string key = it->key().ToString();
+ if (records.size() >= maxSize)
+ break;
+ if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size()))
+ {
+ records.push_back(eventRead);
+ }
+ }
+ delete it;
+ }
+ //! purge record
+ void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records)
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto record : records)
+ {
+ Delete(record->getEventId());
+ }
+ }
// destroy
void destroy() {
if (db_) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/include/provenance/ProvenanceTaskReport.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h b/libminifi/include/provenance/ProvenanceTaskReport.h
new file mode 100644
index 0000000..38edeeb
--- /dev/null
+++ b/libminifi/include/provenance/ProvenanceTaskReport.h
@@ -0,0 +1,88 @@
+/**
+ * @file ProvenanceTaskReport.h
+ * ProvenanceTaskReport class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROVENANCE_TASK_REPORT_H__
+#define __PROVENANCE_TASK_REPORT_H__
+
+#include <mutex>
+#include <memory>
+#include <stack>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "Site2SiteClientProtocol.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+//! ProvenanceTaskReport Class
+class ProvenanceTaskReport: public core::Processor {
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ProvenanceTaskReport(std::string name, uuid_t uuid = NULL) :
+ core::Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ uuid_copy(protocol_uuid_,uuid);
+ this->setTriggerWhenEmpty(true);
+ }
+ //! Destructor
+ virtual ~ProvenanceTaskReport() {
+
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static core::Property hostName;
+ static core::Property port;
+ static core::Property batchSize;
+ //! Supported Relationships
+ static core::Relationship relation;
+public:
+ //! OnTrigger method, implemented by NiFi ProvenanceTaskReport
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ //! Initialize, over write by NiFi ProvenanceTaskReport
+ virtual void initialize(void);
+
+protected:
+
+private:
+ std::unique_ptr<Site2SiteClientProtocol> getNextProtocol();
+ void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol);
+ std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_;
+ std::mutex protocol_mutex_;
+ uuid_t protocol_uuid_;
+ //! Logger
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+// Provenance Task Report
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
index 52a0a02..fbde8a0 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -106,39 +106,39 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() {
}
logger_->log_info("status code is %i", statusCode);
switch (statusCode) {
- case RESOURCE_OK:
- logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
- return true;
- case DIFFERENT_RESOURCE_VERSION:
- uint32_t serverVersion;
- ret = peer_->read(serverVersion);
- if (ret <= 0) {
- // tearDown();
- return false;
- }
- logger_->log_info(
- "Site2Site Server Response asked for a different protocol version %d",
- serverVersion);
- for (unsigned int i = (_currentVersionIndex + 1);
- i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
- if (serverVersion >= _supportedVersion[i]) {
- _currentVersion = _supportedVersion[i];
- _currentVersionIndex = i;
- return initiateResourceNegotiation();
- }
- }
- ret = -1;
- // tearDown();
- return false;
- case NEGOTIATED_ABORT:
- logger_->log_info("Site2Site Negotiate protocol response ABORT");
- ret = -1;
+ case RESOURCE_OK:
+ logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+ return true;
+ case DIFFERENT_RESOURCE_VERSION:
+ uint32_t serverVersion;
+ ret = peer_->read(serverVersion);
+ if (ret <= 0) {
// tearDown();
return false;
- default:
- logger_->log_info("Negotiate protocol response unknown code %d",
- statusCode);
- return true;
+ }
+ logger_->log_info(
+ "Site2Site Server Response asked for a different protocol version %d",
+ serverVersion);
+ for (unsigned int i = (_currentVersionIndex + 1);
+ i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+ if (serverVersion >= _supportedVersion[i]) {
+ _currentVersion = _supportedVersion[i];
+ _currentVersionIndex = i;
+ return initiateResourceNegotiation();
+ }
+ }
+ ret = -1;
+ // tearDown();
+ return false;
+ case NEGOTIATED_ABORT:
+ logger_->log_info("Site2Site Negotiate protocol response ABORT");
+ ret = -1;
+ // tearDown();
+ return false;
+ default:
+ logger_->log_info("Negotiate protocol response unknown code %d",
+ statusCode);
+ return true;
}
return true;
@@ -181,38 +181,38 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
}
switch (statusCode) {
- case RESOURCE_OK:
- logger_->log_info("Site2Site Codec Negotiate version OK");
- return true;
- case DIFFERENT_RESOURCE_VERSION:
- uint32_t serverVersion;
- ret = peer_->read(serverVersion);
- if (ret <= 0) {
- // tearDown();
- return false;
- }
- logger_->log_info(
- "Site2Site Server Response asked for a different codec version %d",
- serverVersion);
- for (unsigned int i = (_currentCodecVersionIndex + 1);
- i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
- if (serverVersion >= _supportedCodecVersion[i]) {
- _currentCodecVersion = _supportedCodecVersion[i];
- _currentCodecVersionIndex = i;
- return initiateCodecResourceNegotiation();
- }
- }
- ret = -1;
- // tearDown();
- return false;
- case NEGOTIATED_ABORT:
- logger_->log_info("Site2Site Codec Negotiate response ABORT");
- ret = -1;
+ case RESOURCE_OK:
+ logger_->log_info("Site2Site Codec Negotiate version OK");
+ return true;
+ case DIFFERENT_RESOURCE_VERSION:
+ uint32_t serverVersion;
+ ret = peer_->read(serverVersion);
+ if (ret <= 0) {
// tearDown();
return false;
- default:
- logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
- return true;
+ }
+ logger_->log_info(
+ "Site2Site Server Response asked for a different codec version %d",
+ serverVersion);
+ for (unsigned int i = (_currentCodecVersionIndex + 1);
+ i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+ if (serverVersion >= _supportedCodecVersion[i]) {
+ _currentCodecVersion = _supportedCodecVersion[i];
+ _currentCodecVersionIndex = i;
+ return initiateCodecResourceNegotiation();
+ }
+ }
+ ret = -1;
+ // tearDown();
+ return false;
+ case NEGOTIATED_ABORT:
+ logger_->log_info("Site2Site Codec Negotiate response ABORT");
+ ret = -1;
+ // tearDown();
+ return false;
+ default:
+ logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+ return true;
}
return true;
@@ -241,7 +241,7 @@ bool Site2SiteClientProtocol::handShake() {
return false;
}
- std::map<std::string, std::string> properties;
+ std::map < std::string, std::string > properties;
properties[HandShakePropertyStr[GZIP]] = "false";
properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(
@@ -286,7 +286,7 @@ bool Site2SiteClientProtocol::handShake() {
return false;
}
logger_->log_info("Site2Site Protocol Send handshake properties %s %s",
- it->first.c_str(), it->second.c_str());
+ it->first.c_str(), it->second.c_str());
}
RespondCode code;
@@ -300,28 +300,28 @@ bool Site2SiteClientProtocol::handShake() {
}
switch (code) {
- case PROPERTIES_OK:
- logger_->log_info("Site2Site HandShake Completed");
- _peerState = HANDSHAKED;
- return true;
- case PORT_NOT_IN_VALID_STATE:
- case UNKNOWN_PORT:
- case PORTS_DESTINATION_FULL:
- logger_->log_error(
- "Site2Site HandShake Failed because destination port is either invalid or full");
- ret = -1;
- /*
- peer_->yield();
- tearDown(); */
- return false;
- default:
- logger_->log_info("HandShake Failed because of unknown respond code %d",
- code);
- ret = -1;
- /*
- peer_->yield();
- tearDown(); */
- return false;
+ case PROPERTIES_OK:
+ logger_->log_info("Site2Site HandShake Completed");
+ _peerState = HANDSHAKED;
+ return true;
+ case PORT_NOT_IN_VALID_STATE:
+ case UNKNOWN_PORT:
+ case PORTS_DESTINATION_FULL:
+ logger_->log_error(
+ "Site2Site HandShake Failed because destination port is either invalid or full");
+ ret = -1;
+ /*
+ peer_->yield();
+ tearDown(); */
+ return false;
+ default:
+ logger_->log_info("HandShake Failed because of unknown respond code %d",
+ code);
+ ret = -1;
+ /*
+ peer_->yield();
+ tearDown(); */
+ return false;
}
return false;
@@ -369,7 +369,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) {
}
int Site2SiteClientProtocol::readRespond(RespondCode &code,
- std::string &message) {
+ std::string &message) {
uint8_t firstByte;
int ret = peer_->read(firstByte);
@@ -408,7 +408,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code,
}
int Site2SiteClientProtocol::writeRespond(RespondCode code,
- std::string message) {
+ std::string message) {
RespondCodeContext *resCode = this->getRespondCodeContext(code);
if (resCode == NULL) {
@@ -525,31 +525,31 @@ Transaction* Site2SiteClientProtocol::createTransaction(
org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
peer_.get());
switch (code) {
- case MORE_DATA:
- dataAvailable = true;
- logger_->log_info("Site2Site peer indicates that data is available");
- transaction = new Transaction(direction, crcstream);
- _transactionMap[transaction->getUUIDStr()] = transaction;
- transactionID = transaction->getUUIDStr();
- transaction->setDataAvailable(dataAvailable);
- logger_->log_info("Site2Site create transaction %s",
- transaction->getUUIDStr().c_str());
- return transaction;
- case NO_MORE_DATA:
- dataAvailable = false;
- logger_->log_info("Site2Site peer indicates that no data is available");
- transaction = new Transaction(direction, crcstream);
- _transactionMap[transaction->getUUIDStr()] = transaction;
- transactionID = transaction->getUUIDStr();
- transaction->setDataAvailable(dataAvailable);
- logger_->log_info("Site2Site create transaction %s",
- transaction->getUUIDStr().c_str());
- return transaction;
- default:
- logger_->log_info(
- "Site2Site got unexpected response %d when asking for data", code);
- // tearDown();
- return NULL;
+ case MORE_DATA:
+ dataAvailable = true;
+ logger_->log_info("Site2Site peer indicates that data is available");
+ transaction = new Transaction(direction, crcstream);
+ _transactionMap[transaction->getUUIDStr()] = transaction;
+ transactionID = transaction->getUUIDStr();
+ transaction->setDataAvailable(dataAvailable);
+ logger_->log_info("Site2Site create transaction %s",
+ transaction->getUUIDStr().c_str());
+ return transaction;
+ case NO_MORE_DATA:
+ dataAvailable = false;
+ logger_->log_info("Site2Site peer indicates that no data is available");
+ transaction = new Transaction(direction, crcstream);
+ _transactionMap[transaction->getUUIDStr()] = transaction;
+ transactionID = transaction->getUUIDStr();
+ transaction->setDataAvailable(dataAvailable);
+ logger_->log_info("Site2Site create transaction %s",
+ transaction->getUUIDStr().c_str());
+ return transaction;
+ default:
+ logger_->log_info(
+ "Site2Site got unexpected response %d when asking for data", code);
+ // tearDown();
+ return NULL;
}
} else {
ret = writeRequestType(SEND_FLOWFILES);
@@ -564,14 +564,14 @@ Transaction* Site2SiteClientProtocol::createTransaction(
_transactionMap[transaction->getUUIDStr()] = transaction;
transactionID = transaction->getUUIDStr();
logger_->log_info("Site2Site create transaction %s",
- transaction->getUUIDStr().c_str());
+ transaction->getUUIDStr().c_str());
return transaction;
}
}
}
bool Site2SiteClientProtocol::receive(std::string transactionID,
- DataPacket *packet, bool &eof) {
+ DataPacket *packet, bool &eof) {
int ret;
Transaction *transaction = NULL;
@@ -602,7 +602,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID,
if (transaction->getDirection() != RECEIVE) {
logger_->log_info("Site2Site transaction %s direction is wrong",
- transactionID.c_str());
+ transactionID.c_str());
return false;
}
@@ -687,9 +687,8 @@ bool Site2SiteClientProtocol::receive(std::string transactionID,
}
bool Site2SiteClientProtocol::send(std::string transactionID,
- DataPacket *packet,
- std::shared_ptr<FlowFileRecord> flowFile,
- core::ProcessSession *session) {
+ DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile,
+ core::ProcessSession *session) {
int ret;
Transaction *transaction = NULL;
@@ -720,7 +719,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
if (transaction->getDirection() != SEND) {
logger_->log_info("Site2Site transaction %s direction is wrong",
- transactionID.c_str());
+ transactionID.c_str());
return false;
}
@@ -751,22 +750,38 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
return false;
}
logger_->log_info("Site2Site transaction %s send attribute key %s value %s",
- transactionID.c_str(), itAttribute->first.c_str(),
- itAttribute->second.c_str());
+ transactionID.c_str(), itAttribute->first.c_str(),
+ itAttribute->second.c_str());
}
- uint64_t len = flowFile->getSize();
- ret = transaction->getStream().write(len);
- if (ret != 8) {
- return false;
- }
+ uint64_t len = 0;
+ if (flowFile) {
+ len = flowFile->getSize();
+ ret = transaction->getStream().write(len);
+ if (ret != 8) {
+ return false;
+ }
+ if (flowFile->getSize()) {
+ Site2SiteClientProtocol::ReadCallback callback(packet);
+ session->read(flowFile, &callback);
+ if (flowFile->getSize() != packet->_size) {
+ return false;
+ }
+ }
+ } else if (packet->payload_.length() > 0) {
+ len = packet->payload_.length();
+
+ ret = transaction->getStream().write(len);
+ if (ret != 8) {
+ return false;
+ }
- if (flowFile->getSize()) {
- Site2SiteClientProtocol::ReadCallback callback(packet);
- session->read(flowFile, &callback);
- if (flowFile->getSize() != packet->_size) {
+ ret = transaction->getStream().writeData(
+ reinterpret_cast<uint8_t *> (const_cast<char*> (packet->payload_.c_str())), len);
+ if (ret != len) {
return false;
}
+ packet->_size += len;
}
transaction->_transfers++;
@@ -780,7 +795,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID,
}
void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
- core::ProcessSession *session) {
+ core::ProcessSession *session) {
uint64_t bytes = 0;
int transfers = 0;
Transaction *transaction = NULL;
@@ -793,7 +808,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
context->yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION,
- "Can not establish handshake with peer");
+ "Can not establish handshake with peer");
return;
}
@@ -810,9 +825,10 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
try {
while (true) {
- std::map<std::string, std::string> empty;
+ std::map < std::string, std::string > empty;
uint64_t startTime = getTimeMillis();
- DataPacket packet(this, transaction, empty);
+ std::string payload;
+ DataPacket packet(this, transaction, empty, payload);
bool eof = false;
if (!receive(transactionID, &packet, eof)) {
@@ -823,8 +839,8 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
// transaction done
break;
}
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast
+ < FlowFileRecord > (session->create());
if (!flowFile) {
throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
@@ -853,8 +869,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host="
+ peer_->getHostName();
session->getProvenanceReporter()->receive(flowFile, transitUri,
- sourceIdentifier, details,
- endTime - startTime);
+ sourceIdentifier, details, endTime - startTime);
session->transfer(flowFile, relation);
// receive the transfer for the flow record
bytes += packet._size;
@@ -941,7 +956,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
int64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
- transaction->getCRC(), transactionID.c_str());
+ transaction->getCRC(), transactionID.c_str());
ret = writeRespond(CONFIRM_TRANSACTION, crc);
if (ret <= 0)
return false;
@@ -953,24 +968,24 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
if (code == CONFIRM_TRANSACTION) {
logger_->log_info("Site2Site transaction %s peer confirm transaction",
- transactionID.c_str());
+ transactionID.c_str());
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else if (code == BAD_CHECKSUM) {
logger_->log_info("Site2Site transaction %s peer indicate bad checksum",
- transactionID.c_str());
+ transactionID.c_str());
/*
transaction->_state = TRANSACTION_CONFIRMED;
return true; */
return false;
} else {
logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
- transactionID.c_str(), code);
+ transactionID.c_str(), code);
return false;
}
} else {
logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
- transactionID.c_str());
+ transactionID.c_str());
ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
if (ret <= 0)
return false;
@@ -990,7 +1005,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
std::string crc = std::to_string(crcValue);
if (message == crc) {
logger_->log_info("Site2Site transaction %s CRC matched",
- transactionID.c_str());
+ transactionID.c_str());
ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
if (ret <= 0)
return false;
@@ -998,7 +1013,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
return true;
} else {
logger_->log_info("Site2Site transaction %s CRC not matched %s",
- transactionID.c_str(), crc.c_str());
+ transactionID.c_str(), crc.c_str());
ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
/*
ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
@@ -1016,7 +1031,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) {
return true;
} else {
logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
- transactionID.c_str(), code);
+ transactionID.c_str(), code);
return false;
}
return false;
@@ -1065,7 +1080,7 @@ void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
}
logger_->log_info("Site2Site delete transaction %s",
- transaction->getUUIDStr().c_str());
+ transaction->getUUIDStr().c_str());
delete transaction;
_transactionMap.erase(transactionID);
}
@@ -1119,7 +1134,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
return true;
} else {
logger_->log_info("Site2Site transaction %s send finished",
- transactionID.c_str());
+ transactionID.c_str());
ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
if (ret <= 0) {
return false;
@@ -1140,21 +1155,21 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) {
if (code == TRANSACTION_FINISHED) {
logger_->log_info("Site2Site transaction %s peer finished transaction",
- transactionID.c_str());
+ transactionID.c_str());
transaction->_state = TRANSACTION_COMPLETED;
return true;
} else {
logger_->log_info("Site2Site transaction %s peer unknown respond code %d",
- transactionID.c_str(), code);
+ transactionID.c_str(), code);
return false;
}
}
}
void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
- core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flow =
- std::static_pointer_cast<FlowFileRecord>(session->get());
+ core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast
+ < FlowFileRecord > (session->get());
Transaction *transaction = NULL;
@@ -1169,7 +1184,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
context->yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION,
- "Can not establish handshake with peer");
+ "Can not establish handshake with peer");
return;
}
@@ -1190,27 +1205,28 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
try {
while (continueTransaction) {
uint64_t startTime = getTimeMillis();
- DataPacket packet(this, transaction, flow->getAttributes());
+ std::string payload;
+ DataPacket packet(this, transaction, flow->getAttributes(), payload);
if (!send(transactionID, &packet, flow, session)) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
return;
}
logger_->log_info("Site2Site transaction %s send flow record %s",
- transactionID.c_str(), flow->getUUIDStr().c_str());
+ transactionID.c_str(), flow->getUUIDStr().c_str());
uint64_t endTime = getTimeMillis();
std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
+ peer_->getHostName();
session->getProvenanceReporter()->send(flow, transitUri, details,
- endTime - startTime, false);
+ endTime - startTime, false);
session->remove(flow);
uint64_t transferNanos = getTimeNano() - startSendingNanos;
if (transferNanos > _batchSendNanos)
break;
- flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+ flow = std::static_pointer_cast < FlowFileRecord > (session->get());
if (!flow) {
continueTransaction = false;
@@ -1250,6 +1266,80 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
return;
}
+void Site2SiteClientProtocol::transferString(core::ProcessContext *context,
+ core::ProcessSession *session, std::string &payload,
+ std::map<std::string, std::string> attributes) {
+ Transaction *transaction = NULL;
+
+ if (payload.length() <= 0)
+ return;
+
+ if (_peerState != READY) {
+ bootstrap();
+ }
+
+ if (_peerState != READY) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION,
+ "Can not establish handshake with peer");
+ return;
+ }
+
+ // Create the transaction
+ std::string transactionID;
+ transaction = createTransaction(transactionID, SEND);
+
+ if (transaction == NULL) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+ return;
+ }
+
+ try {
+ DataPacket packet(this, transaction, attributes, payload);
+
+ if (!send(transactionID, &packet, nullptr, session)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+ return;
+ }
+ logger_->log_info("Site2Site transaction %s send bytes length %d",
+ transactionID.c_str(), payload.length());
+
+ if (!confirm(transactionID)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+ return;
+ }
+ if (!complete(transactionID)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+ return;
+ }
+ logger_->log_info(
+ "Site2Site transaction %s successfully send flow record %d, content bytes %d",
+ transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+ } catch (std::exception &exception) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug(
+ "Caught Exception during Site2SiteClientProtocol::transferBytes");
+ throw;
+ }
+
+ deleteTransaction(transactionID);
+
+ return;
+}
+
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 68aaf5c..33f9b43 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -71,6 +71,10 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
== org::apache::nifi::minifi::processors::AppendHostInfo::ProcessorName) {
processor = std::make_shared<
org::apache::nifi::minifi::processors::AppendHostInfo>(name, uuid);
+ } else if (name
+ == org::apache::nifi::minifi::provenance::ProvenanceTaskReport::ProcessorName) {
+ processor = std::make_shared<
+ org::apache::nifi::minifi::provenance::ProvenanceTaskReport>(name, uuid);
} else {
logger_->log_error("No Processor defined for %s", name.c_str());
return nullptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index b1db9a8..083d0b2 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -34,6 +34,11 @@ namespace nifi {
namespace minifi {
namespace provenance {
+const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] =
+{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK",
+ "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE",
+ "ADDINFO", "REPLAY"};
+
// DeSerialize
bool ProvenanceEventRecord::DeSerialize(
const std::shared_ptr<core::Repository> &repo, std::string key) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index 77de5ba..1cf66f6 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -35,6 +35,7 @@ void ProvenanceRepository::run() {
uint64_t curTime = getTimeMillis();
uint64_t size = repoSize();
if (size >= purgeThreshold) {
+ // std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> purgeList;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/libminifi/src/provenance/ProvenanceTaskReport.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp b/libminifi/src/provenance/ProvenanceTaskReport.cpp
new file mode 100644
index 0000000..d4995a0
--- /dev/null
+++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp
@@ -0,0 +1,221 @@
+/**
+ * @file ProvenanceTaskReport.cpp
+ * ProvenanceTaskReport class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "provenance/ProvenanceTaskReport.h"
+#include "../include/io/StreamFactory.h"
+#include "io/ClientSocket.h"
+#include "utils/TimeUtil.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "provenance/Provenance.h"
+#include "FlowController.h"
+
+#include "json/json.h"
+#include "json/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance{
+
+const std::string ProvenanceTaskReport::ProcessorName("ProvenanceTaskReport");
+core::Property ProvenanceTaskReport::hostName("Host Name", "Remote Host Name.", "localhost");
+core::Property ProvenanceTaskReport::port("Port", "Remote Port", "9999");
+core::Property ProvenanceTaskReport::batchSize("Batch Size", "Specifies how many records to send in a single batch, at most.", "100");
+core::Relationship ProvenanceTaskReport::relation;
+
+void ProvenanceTaskReport::initialize()
+{
+ //! Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(hostName);
+ properties.insert(port);
+ properties.insert(batchSize);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(relation);
+ setSupportedRelationships(relationships);
+}
+
+std::unique_ptr<Site2SiteClientProtocol> ProvenanceTaskReport::getNextProtocol()
+{
+ std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
+ if (available_protocols_.empty())
+ return nullptr;
+ std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(available_protocols_.top());
+ available_protocols_.pop();
+ return std::move(return_pointer);
+}
+
+void ProvenanceTaskReport::returnProtocol(
+ std::unique_ptr<Site2SiteClientProtocol> return_protocol)
+{
+ std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
+ available_protocols_.push(std::move(return_protocol));
+}
+
+void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session)
+{
+ std::string value;
+ int64_t lvalue;
+
+ std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
+
+ if (protocol_ == nullptr)
+ {
+ protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
+ new Site2SiteClientProtocol(0));
+ protocol_->setPortId(protocol_uuid_);
+
+ std::string host = "";
+ uint16_t sport = 0;
+
+ if (context->getProperty(hostName.getName(), value)) {
+ host = value;
+ }
+ if (context->getProperty(port.getName(), value)
+ && core::Property::StringToInt(value, lvalue)) {
+ sport = (uint16_t) lvalue;
+ }
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+ org::apache::nifi::minifi::io::StreamFactory::getInstance()
+ ->createSocket(host, sport));
+
+ std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
+ new Site2SitePeer(std::move(str), host, sport));
+
+ protocol_->setPeer(std::move(peer_));
+ }
+
+ if (!protocol_->bootstrap())
+ {
+ // bootstrap the client protocol if needeed
+ context->yield();
+ std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+ context->getProcessorNode().getProcessor());
+ logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
+ processor->getYieldPeriodMsec());
+ return;
+ }
+
+ int batch = 100;
+
+ if (context->getProperty(batchSize.getName(), value) && core::Property::StringToInt(value, lvalue))
+ {
+ batch = (int) lvalue;
+ }
+
+ std::vector<std::shared_ptr<ProvenanceEventRecord>> records;
+ std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast<ProvenanceRepository> (context->getProvenanceRepository());
+
+ repo->getProvenanceRecord(records, batch);
+
+ if (records.size() <= 0)
+ {
+ returnProtocol(std::move(protocol_));
+ return;
+ }
+
+ Json::Value array;
+ for (auto record : records)
+ {
+ Json::Value recordJson;
+ Json::Value updatedAttributesJson;
+ Json::Value parentUuidJson;
+ Json::Value childUuidJson;
+ recordJson["eventId"] = record->getEventId().c_str();
+ recordJson["eventType"] = ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+ recordJson["timestampMillis"] = record->getEventTime();
+ recordJson["durationMillis"] = record->getEventDuration();
+ recordJson["lineageStart"] = record->getlineageStartDate();
+ recordJson["details"] = record->getDetails().c_str();
+ recordJson["componentId"] = record->getComponentId().c_str();
+ recordJson["componentType"] = record->getComponentType().c_str();
+ recordJson["entityId"] = record->getFlowFileUuid().c_str();
+ recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
+ recordJson["entitySize"] = record->getFileSize();
+ recordJson["entityOffset"] = record->getFileOffset();
+
+ for (auto attr : record->getAttributes())
+ {
+ updatedAttributesJson[attr.first] = attr.second;
+ }
+ recordJson["updatedAttributes"] = updatedAttributesJson;
+
+ for (auto parentUUID : record->getParentUuids())
+ {
+ parentUuidJson.append(parentUUID.c_str());
+ }
+ recordJson["parentIds"] = parentUuidJson;
+
+ for (auto childUUID : record->getChildrenUuids())
+ {
+ childUuidJson.append(childUUID.c_str());
+ }
+ recordJson["childIds"] = childUuidJson;
+ recordJson["transitUri"] = record->getTransitUri().c_str();
+ recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str();
+ recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str();
+ recordJson["application"] = "MiNiFi Flow";
+ array.append(recordJson);
+ }
+
+ Json::StyledWriter writer;
+ std::string jsonStr = writer.write(array);
+ uint8_t *payload = (uint8_t *) jsonStr.c_str();
+ int length = jsonStr.length();
+
+ try
+ {
+ std::map<std::string, std::string> attributes;
+ protocol_->transferBytes(context, session, payload, length, attributes);
+ }
+ catch (...)
+ {
+ // if transfer bytes failed, return instead of purge the provenance records
+ returnProtocol(std::move(protocol_));
+ return;
+ }
+
+ // we transfer the record, purge the record from DB
+ repo->purgeProvenanceRecord(records);
+
+ returnProtocol(std::move(protocol_));
+
+ return;
+}
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index f7bd6e3..baf4254 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
CMAKE_POLICY(SET CMP0048 OLD)
ENDIF(POLICY CMP0048)
-include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
+include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/)
find_package(Boost REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
@@ -43,8 +43,8 @@ find_package(UUID REQUIRED)
find_package(OpenSSL REQUIRED)
include_directories(${OPENSSL_INCLUDE_DIR})
-# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, and leveldb
-target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES})
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and leveldb
+target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES})
set_target_properties(minifiexe
PROPERTIES OUTPUT_NAME minifi)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/thirdparty/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/AUTHORS b/thirdparty/jsoncpp/AUTHORS
new file mode 100644
index 0000000..c0fbbee
--- /dev/null
+++ b/thirdparty/jsoncpp/AUTHORS
@@ -0,0 +1 @@
+Baptiste Lepilleur <bl...@users.sourceforge.net>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/thirdparty/jsoncpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/CMakeLists.txt b/thirdparty/jsoncpp/CMakeLists.txt
new file mode 100644
index 0000000..a6c0884
--- /dev/null
+++ b/thirdparty/jsoncpp/CMakeLists.txt
@@ -0,0 +1,156 @@
+# vim: et ts=4 sts=4 sw=4 tw=0
+
+CMAKE_MINIMUM_REQUIRED(VERSION 3.1)
+PROJECT(jsoncpp)
+ENABLE_TESTING()
+
+OPTION(JSONCPP_WITH_TESTS "Compile and (for jsoncpp_check) run JsonCpp test executables" OFF)
+OPTION(JSONCPP_WITH_POST_BUILD_UNITTEST "Automatically run unit-tests as a post build step" OFF)
+OPTION(JSONCPP_WITH_WARNING_AS_ERROR "Force compilation to fail if a warning occurs" OFF)
+OPTION(JSONCPP_WITH_STRICT_ISO "Issue all the warnings demanded by strict ISO C and ISO C++" ON)
+OPTION(JSONCPP_WITH_PKGCONFIG_SUPPORT "Generate and install .pc files" ON)
+OPTION(JSONCPP_WITH_CMAKE_PACKAGE "Generate and install cmake package files" OFF)
+OPTION(BUILD_SHARED_LIBS "Build jsoncpp_lib as a shared library." OFF)
+OPTION(BUILD_STATIC_LIBS "Build jsoncpp_lib static library." ON)
+
+# Ensures that CMAKE_BUILD_TYPE is visible in cmake-gui on Unix
+IF(NOT WIN32)
+ IF(NOT CMAKE_BUILD_TYPE)
+ SET(CMAKE_BUILD_TYPE Release CACHE STRING
+ "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel Coverage."
+ FORCE)
+ ENDIF()
+ENDIF()
+
+# Enable runtime search path support for dynamic libraries on OSX
+IF(APPLE)
+ SET(CMAKE_MACOSX_RPATH 1)
+ENDIF()
+
+# Adhere to GNU filesystem layout conventions
+INCLUDE(GNUInstallDirs)
+
+SET(DEBUG_LIBNAME_SUFFIX "" CACHE STRING "Optional suffix to append to the library name for a debug build")
+
+# Set variable named ${VAR_NAME} to value ${VALUE}
+FUNCTION(set_using_dynamic_name VAR_NAME VALUE)
+ SET( "${VAR_NAME}" "${VALUE}" PARENT_SCOPE)
+ENDFUNCTION()
+
+# Extract major, minor, patch from version text
+# Parse a version string "X.Y.Z" and outputs
+# version parts in ${OUPUT_PREFIX}_MAJOR, _MINOR, _PATCH.
+# If parse succeeds then ${OUPUT_PREFIX}_FOUND is TRUE.
+MACRO(jsoncpp_parse_version VERSION_TEXT OUPUT_PREFIX)
+ SET(VERSION_REGEX "[0-9]+\\.[0-9]+\\.[0-9]+(-[a-zA-Z0-9_]+)?")
+ IF( ${VERSION_TEXT} MATCHES ${VERSION_REGEX} )
+ STRING(REGEX MATCHALL "[0-9]+|-([A-Za-z0-9_]+)" VERSION_PARTS ${VERSION_TEXT})
+ LIST(GET VERSION_PARTS 0 ${OUPUT_PREFIX}_MAJOR)
+ LIST(GET VERSION_PARTS 1 ${OUPUT_PREFIX}_MINOR)
+ LIST(GET VERSION_PARTS 2 ${OUPUT_PREFIX}_PATCH)
+ set_using_dynamic_name( "${OUPUT_PREFIX}_FOUND" TRUE )
+ ELSE( ${VERSION_TEXT} MATCHES ${VERSION_REGEX} )
+ set_using_dynamic_name( "${OUPUT_PREFIX}_FOUND" FALSE )
+ ENDIF()
+ENDMACRO()
+
+# Read out version from "version" file
+#FILE(STRINGS "version" JSONCPP_VERSION)
+#SET( JSONCPP_VERSION_MAJOR X )
+#SET( JSONCPP_VERSION_MINOR Y )
+#SET( JSONCPP_VERSION_PATCH Z )
+SET( JSONCPP_VERSION 1.8.0 )
+jsoncpp_parse_version( ${JSONCPP_VERSION} JSONCPP_VERSION )
+#IF(NOT JSONCPP_VERSION_FOUND)
+# MESSAGE(FATAL_ERROR "Failed to parse version string properly. Expect X.Y.Z")
+#ENDIF(NOT JSONCPP_VERSION_FOUND)
+SET( JSONCPP_SOVERSION 11 )
+SET( JSONCPP_USE_SECURE_MEMORY "0" CACHE STRING "-D...=1 to use memory-wiping allocator for STL" )
+
+MESSAGE(STATUS "JsonCpp Version: ${JSONCPP_VERSION_MAJOR}.${JSONCPP_VERSION_MINOR}.${JSONCPP_VERSION_PATCH}")
+# File version.h is only regenerated on CMake configure step
+CONFIGURE_FILE( "${PROJECT_SOURCE_DIR}/src/lib_json/version.h.in"
+ "${PROJECT_SOURCE_DIR}/include/json/version.h"
+ NEWLINE_STYLE UNIX )
+CONFIGURE_FILE( "${PROJECT_SOURCE_DIR}/version.in"
+ "${PROJECT_SOURCE_DIR}/version"
+ NEWLINE_STYLE UNIX )
+
+MACRO(UseCompilationWarningAsError)
+ IF(MSVC)
+ # Only enabled in debug because some old versions of VS STL generate
+ # warnings when compiled in release configuration.
+ SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /WX ")
+ ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
+ IF(JSONCPP_WITH_STRICT_ISO)
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic-errors")
+ ENDIF()
+ ENDIF()
+ENDMACRO()
+
+# Include our configuration header
+INCLUDE_DIRECTORIES( ${jsoncpp_SOURCE_DIR}/include )
+
+IF(MSVC)
+ # Only enabled in debug because some old versions of VS STL generate
+ # unreachable code warning when compiled in release configuration.
+ SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /W4 ")
+ENDIF()
+
+# Require C++11 support, prefer ISO C++ over GNU variants,
+# as relying solely on ISO C++ is more portable.
+SET(CMAKE_CXX_STANDARD 11)
+SET(CMAKE_CXX_STANDARD_REQUIRED ON)
+SET(CMAKE_CXX_EXTENSIONS OFF)
+
+IF(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ # using regular Clang or AppleClang
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Werror=conversion -Werror=sign-compare")
+ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+ # using GCC
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Wextra")
+ # not yet ready for -Wsign-conversion
+
+ IF(JSONCPP_WITH_STRICT_ISO AND NOT JSONCPP_WITH_WARNING_AS_ERROR)
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror=conversion -pedantic")
+ ENDIF()
+ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "Intel")
+ #�using Intel compiler
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Wextra -Werror=conversion")
+
+ IF(JSONCPP_WITH_STRICT_ISO AND NOT JSONCPP_WITH_WARNING_AS_ERROR)
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic")
+ ENDIF()
+ENDIF()
+
+FIND_PROGRAM(CCACHE_FOUND ccache)
+IF(CCACHE_FOUND)
+ SET_PROPERTY(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
+ SET_PROPERTY(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
+ENDIF(CCACHE_FOUND)
+
+IF(JSONCPP_WITH_WARNING_AS_ERROR)
+ UseCompilationWarningAsError()
+ENDIF()
+
+IF(JSONCPP_WITH_PKGCONFIG_SUPPORT)
+ CONFIGURE_FILE(
+ "pkg-config/jsoncpp.pc.in"
+ "pkg-config/jsoncpp.pc"
+ @ONLY)
+ INSTALL(FILES "${CMAKE_CURRENT_BINARY_DIR}/pkg-config/jsoncpp.pc"
+ DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig")
+ENDIF()
+
+IF(JSONCPP_WITH_CMAKE_PACKAGE)
+ INSTALL(EXPORT jsoncpp
+ DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/jsoncpp
+ FILE jsoncppConfig.cmake)
+ENDIF()
+
+# Build the different applications
+ADD_SUBDIRECTORY( src )
+
+#install the includes
+ADD_SUBDIRECTORY( include )
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/thirdparty/jsoncpp/LICENSE
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/LICENSE b/thirdparty/jsoncpp/LICENSE
new file mode 100644
index 0000000..ca2bfe1
--- /dev/null
+++ b/thirdparty/jsoncpp/LICENSE
@@ -0,0 +1,55 @@
+The JsonCpp library's source code, including accompanying documentation,
+tests and demonstration applications, are licensed under the following
+conditions...
+
+The author (Baptiste Lepilleur) explicitly disclaims copyright in all
+jurisdictions which recognize such a disclaimer. In such jurisdictions,
+this software is released into the Public Domain.
+
+In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
+2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is
+released under the terms of the MIT License (see below).
+
+In jurisdictions which recognize Public Domain property, the user of this
+software may choose to accept it either as 1) Public Domain, 2) under the
+conditions of the MIT License (see below), or 3) under the terms of dual
+Public Domain/MIT License conditions described here, as they choose.
+
+The MIT License is about as close to Public Domain as a license can get, and is
+described in clear, concise terms at:
+
+ http://en.wikipedia.org/wiki/MIT_License
+
+The full text of the MIT License follows:
+
+========================================================================
+Copyright (c) 2007-2010 Baptiste Lepilleur
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use, copy,
+modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+========================================================================
+(END LICENSE TEXT)
+
+The MIT license is compatible with both the GPL and commercial
+software, affording one all of the rights of Public Domain with the
+minor nuisance of being required to keep the above copyright notice
+and license text in the source code. Note also that by accepting the
+Public Domain "license" you can re-license your copy using whatever
+license you like.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/thirdparty/jsoncpp/NEWS.txt
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/NEWS.txt b/thirdparty/jsoncpp/NEWS.txt
new file mode 100644
index 0000000..5733fcd
--- /dev/null
+++ b/thirdparty/jsoncpp/NEWS.txt
@@ -0,0 +1,175 @@
+New in SVN
+----------
+
+ * Updated the type system's behavior, in order to better support backwards
+ compatibility with code that was written before 64-bit integer support was
+ introduced. Here's how it works now:
+
+ * isInt, isInt64, isUInt, and isUInt64 return true if and only if the
+ value can be exactly represented as that type. In particular, a value
+ constructed with a double like 17.0 will now return true for all of
+ these methods.
+
+ * isDouble and isFloat now return true for all numeric values, since all
+ numeric values can be converted to a double or float without
+ truncation. Note however that the conversion may not be exact -- for
+ example, doubles cannot exactly represent all integers above 2^53 + 1.
+
+ * isBool, isNull, isString, isArray, and isObject now return true if and
+ only if the value is of that type.
+
+ * isConvertibleTo(fooValue) indicates that it is safe to call asFoo.
+ (For each type foo, isFoo always implies isConvertibleTo(fooValue).)
+ asFoo returns an approximate or exact representation as appropriate.
+ For example, a double value may be truncated when asInt is called.
+
+ * For backwards compatibility with old code, isConvertibleTo(intValue)
+ may return false even if type() == intValue. This is because the value
+ may have been constructed with a 64-bit integer larger than maxInt,
+ and calling asInt() would cause an exception. If you're writing new
+ code, use isInt64 to find out whether the value is exactly
+ representable using an Int64, or asDouble() combined with minInt64 and
+ maxInt64 to figure out whether it is approximately representable.
+
+* Value
+ - Patch #10: BOOST_FOREACH compatibility. Made Json::iterator more
+ standard compliant, added missing iterator_category and value_type
+ typedefs (contribued by Robert A. Iannucci).
+
+* Compilation
+
+ - New CMake based build system. Based in part on contribution from
+ Igor Okulist and Damien Buhl (Patch #14).
+
+ - New header json/version.h now contains version number macros
+ (JSONCPP_VERSION_MAJOR, JSONCPP_VERSION_MINOR, JSONCPP_VERSION_PATCH
+ and JSONCPP_VERSION_HEXA).
+
+ - Patch #11: added missing JSON_API on some classes causing link issues
+ when building as a dynamic library on Windows
+ (contributed by Francis Bolduc).
+
+ - Visual Studio DLL: suppressed warning "C4251: <data member>: <type>
+ needs to have dll-interface to be used by..." via pragma push/pop
+ in json-cpp headers.
+
+ - Added Travis CI intregration: https://travis-ci.org/blep/jsoncpp-mirror
+
+* Bug fixes
+ - Patch #15: Copy constructor does not initialize allocated_ for stringValue
+ (contributed by rmongia).
+
+ - Patch #16: Missing field copy in Json::Value::iterator causing infinite
+ loop when using experimental internal map (#define JSON_VALUE_USE_INTERNAL_MAP)
+ (contributed by Ming-Lin Kao).
+
+
+ New in JsonCpp 0.6.0:
+ ---------------------
+
+* Compilation
+
+ - LD_LIBRARY_PATH and LIBRARY_PATH environment variables are now
+ propagated to the build environment as this is required for some
+ compiler installation.
+
+ - Added support for Microsoft Visual Studio 2008 (bug #2930462):
+ The platform "msvc90" has been added.
+
+ Notes: you need to setup the environment by running vcvars32.bat
+ (e.g. MSVC 2008 command prompt in start menu) before running scons.
+
+ - Added support for amalgamated source and header generation (a la sqlite).
+ Refer to README.md section "Generating amalgamated source and header"
+ for detail.
+
+* Value
+
+ - Removed experimental ValueAllocator, it caused static
+ initialization/destruction order issues (bug #2934500).
+ The DefaultValueAllocator has been inlined in code.
+
+ - Added support for 64 bits integer:
+
+ Types Json::Int64 and Json::UInt64 have been added. They are aliased
+ to 64 bits integers on system that support them (based on __int64 on
+ Microsoft Visual Studio platform, and long long on other platforms).
+
+ Types Json::LargestInt and Json::LargestUInt have been added. They are
+ aliased to the largest integer type supported:
+ either Json::Int/Json::UInt or Json::Int64/Json::UInt64 respectively.
+
+ Json::Value::asInt() and Json::Value::asUInt() still returns plain
+ "int" based types, but asserts if an attempt is made to retrieve
+ a 64 bits value that can not represented as the return type.
+
+ Json::Value::asInt64() and Json::Value::asUInt64() have been added
+ to obtain the 64 bits integer value.
+
+ Json::Value::asLargestInt() and Json::Value::asLargestUInt() returns
+ the integer as a LargestInt/LargestUInt respectively. Those functions
+ functions are typically used when implementing writer.
+
+ The reader attempts to read number as 64 bits integer, and fall back
+ to reading a double if the number is not in the range of 64 bits
+ integer.
+
+ Warning: Json::Value::asInt() and Json::Value::asUInt() now returns
+ long long. This changes break code that was passing the return value
+ to *printf() function.
+
+ Support for 64 bits integer can be disabled by defining the macro
+ JSON_NO_INT64 (uncomment it in json/config.h for example), though
+ it should have no impact on existing usage.
+
+ - The type Json::ArrayIndex is used for indexes of a JSON value array. It
+ is an unsigned int (typically 32 bits).
+
+ - Array index can be passed as int to operator[], allowing use of literal:
+ Json::Value array;
+ array.append( 1234 );
+ int value = array[0].asInt(); // did not compile previously
+
+ - Added float Json::Value::asFloat() to obtain a floating point value as a
+ float (avoid lost of precision warning caused by used of asDouble()
+ to initialize a float).
+
+* Reader
+
+ - Renamed Reader::getFormatedErrorMessages() to getFormattedErrorMessages.
+ Bug #3023708 (Formatted has 2 't'). The old member function is deprecated
+ but still present for backward compatibility.
+
+* Tests
+
+ - Added test to ensure that the escape sequence "\/" is corrected handled
+ by the parser.
+
+* Bug fixes
+
+ - Bug #3139677: JSON [1 2 3] was incorrectly parsed as [1, 3]. Error is now
+ correctly detected.
+
+ - Bug #3139678: stack buffer overflow when parsing a double with a
+ length of 32 characters.
+
+ - Fixed Value::operator <= implementation (had the semantic of operator >=).
+ Found when adding unit tests for comparison operators.
+
+ - Value::compare() is now const and has an actual implementation with
+ unit tests.
+
+ - Bug #2407932: strpbrk() can fail for NULL pointer.
+
+ - Bug #3306345: Fixed minor typo in Path::resolve().
+
+ - Bug #3314841/#3306896: errors in amalgamate.py
+
+ - Fixed some Coverity warnings and line-endings.
+
+* License
+
+ - See file LICENSE for details. Basically JsonCpp is now licensed under
+ MIT license, or public domain if desired and recognized in your jurisdiction.
+ Thanks to Stephan G. Beal [http://wanderinghorse.net/home/stephan/]) who
+ helped figuring out the solution to the public domain issue.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/126c2ed5/thirdparty/jsoncpp/README.md
----------------------------------------------------------------------
diff --git a/thirdparty/jsoncpp/README.md b/thirdparty/jsoncpp/README.md
new file mode 100644
index 0000000..038ccd2
--- /dev/null
+++ b/thirdparty/jsoncpp/README.md
@@ -0,0 +1,225 @@
+# JsonCpp
+
+[JSON][json-org] is a lightweight data-interchange format. It can represent
+numbers, strings, ordered sequences of values, and collections of name/value
+pairs.
+
+[json-org]: http://json.org/
+
+JsonCpp is a C++ library that allows manipulating JSON values, including
+serialization and deserialization to and from strings. It can also preserve
+existing comment in unserialization/serialization steps, making it a convenient
+format to store user input files.
+
+
+## Documentation
+
+[JsonCpp documentation][JsonCpp-documentation] is generated using [Doxygen][].
+
+[JsonCpp-documentation]: http://open-source-parsers.github.io/jsoncpp-docs/doxygen/index.html
+[Doxygen]: http://www.doxygen.org
+
+
+## A note on backward-compatibility
+
+* `1.y.z` is built with C++11.
+* `0.y.z` can be used with older compilers.
+* Major versions maintain binary-compatibility.
+
+
+## Using JsonCpp in your project
+
+The recommended approach to integrating JsonCpp in your project is to include
+the [amalgamated source](#generating-amalgamated-source-and-header) (a single
+`.cpp` file and two `.h` files) in your project, and compile and build as you
+would any other source file. This ensures consistency of compilation flags and
+ABI compatibility, issues which arise when building shared or static
+libraries. See the next section for instructions.
+
+The `include/` should be added to your compiler include path. JsonCpp headers
+should be included as follow:
+
+ #include <json/json.h>
+
+If JsonCpp was built as a dynamic library on Windows, then your project needs to define the macro `JSON_DLL`.
+
+### Generating amalgamated source and header
+
+JsonCpp is provided with a script to generate a single header and a single
+source file to ease inclusion into an existing project. The amalgamated source
+can be generated at any time by running the following command from the
+top-directory (this requires Python 2.6):
+
+ python amalgamate.py
+
+It is possible to specify header name. See the `-h` option for detail.
+
+By default, the following files are generated:
+
+* `dist/jsoncpp.cpp`: source file that needs to be added to your project.
+* `dist/json/json.h`: corresponding header file for use in your project. It is
+ equivalent to including `json/json.h` in non-amalgamated source. This header
+ only depends on standard headers.
+* `dist/json/json-forwards.h`: header that provides forward declaration of all
+ JsonCpp types.
+
+The amalgamated sources are generated by concatenating JsonCpp source in the
+correct order and defining the macro `JSON_IS_AMALGAMATION` to prevent inclusion of other headers.
+
+
+## Contributing to JsonCpp
+
+### Building and testing with CMake
+
+[CMake][] is a C++ Makefiles/Solution generator. It is usually available on most Linux system as package. On Ubuntu:
+
+ sudo apt-get install cmake
+
+[CMake]: http://www.cmake.org
+
+Note that Python is also required to run the JSON reader/writer tests. If
+missing, the build will skip running those tests.
+
+When running CMake, a few parameters are required:
+
+* A build directory where the makefiles/solution are generated. It is also used
+ to store objects, libraries and executables files.
+* The generator to use: makefiles or Visual Studio solution? What version or
+ Visual Studio, 32 or 64 bits solution?
+
+Steps for generating solution/makefiles using `cmake-gui`:
+
+* Make "source code" point to the source directory.
+* Make "where to build the binary" point to the directory to use for the build.
+* Click on the "Grouped" check box.
+* Review JsonCpp build options (tick `BUILD_SHARED_LIBS` to build as a
+ dynamic library).
+* Click the configure button at the bottom, then the generate button.
+* The generated solution/makefiles can be found in the binary directory.
+
+Alternatively, from the command-line on Unix in the source directory:
+
+ mkdir -p build/debug
+ cd build/debug
+ cmake -DCMAKE_BUILD_TYPE=debug -DBUILD_STATIC_LIBS=ON -DBUILD_SHARED_LIBS=OFF -DARCHIVE_INSTALL_DIR=. -G "Unix Makefiles" ../..
+ make
+
+For a good pkg-config file, add:
+
+ -DCMAKE_INSTALL_INCLUDEDIR=include/jsoncpp
+
+Running `cmake -h` will display the list of available generators (passed using
+the `-G` option).
+
+By default CMake hides compilation commands. This can be modified by specifying
+`-DCMAKE_VERBOSE_MAKEFILE=true` when generating makefiles.
+
+### Building and testing with SCons
+
+**Note:** The SCons-based build system is deprecated. Please use CMake (see the
+section above).
+
+JsonCpp can use [Scons][] as a build system. Note that SCons requires Python to
+be installed.
+
+[SCons]: http://www.scons.org/
+
+Invoke SCons as follows:
+
+ scons platform=$PLATFORM [TARGET]
+
+where `$PLATFORM` may be one of:
+
+* `suncc`: Sun C++ (Solaris)
+* `vacpp`: Visual Age C++ (AIX)
+* `mingw`
+* `msvc6`: Microsoft Visual Studio 6 service pack 5-6
+* `msvc70`: Microsoft Visual Studio 2002
+* `msvc71`: Microsoft Visual Studio 2003
+* `msvc80`: Microsoft Visual Studio 2005
+* `msvc90`: Microsoft Visual Studio 2008
+* `linux-gcc`: Gnu C++ (linux, also reported to work for Mac OS X)
+
+If you are building with Microsoft Visual Studio 2008, you need to set up the
+environment by running `vcvars32.bat` (e.g. MSVC 2008 command prompt) before
+running SCons.
+
+### Running the tests manually
+
+You need to run tests manually only if you are troubleshooting an issue.
+
+In the instructions below, replace `path/to/jsontest` with the path of the
+`jsontest` executable that was compiled on your platform.
+
+ cd test
+ # This will run the Reader/Writer tests
+ python runjsontests.py path/to/jsontest
+
+ # This will run the Reader/Writer tests, using JSONChecker test suite
+ # (http://www.json.org/JSON_checker/).
+ # Notes: not all tests pass: JsonCpp is too lenient (for example,
+ # it allows an integer to start with '0'). The goal is to improve
+ # strict mode parsing to get all tests to pass.
+ python runjsontests.py --with-json-checker path/to/jsontest
+
+ # This will run the unit tests (mostly Value)
+ python rununittests.py path/to/test_lib_json
+
+ # You can run the tests using valgrind:
+ python rununittests.py --valgrind path/to/test_lib_json
+
+### Running the tests using SCons
+
+Note that tests can be run using SCons using the `check` target:
+
+ scons platform=$PLATFORM check
+
+### Building the documentation
+
+Run the Python script `doxybuild.py` from the top directory:
+
+ python doxybuild.py --doxygen=$(which doxygen) --open --with-dot
+
+See `doxybuild.py --help` for options.
+
+### Adding a reader/writer test
+
+To add a test, you need to create two files in test/data:
+
+* a `TESTNAME.json` file, that contains the input document in JSON format.
+* a `TESTNAME.expected` file, that contains a flatened representation of the
+ input document.
+
+The `TESTNAME.expected` file format is as follows:
+
+* Each line represents a JSON element of the element tree represented by the
+ input document.
+* Each line has two parts: the path to access the element separated from the
+ element value by `=`. Array and object values are always empty (i.e.
+ represented by either `[]` or `{}`).
+* Element path `.` represents the root element, and is used to separate object
+ members. `[N]` is used to specify the value of an array element at index `N`.
+
+See the examples `test_complex_01.json` and `test_complex_01.expected` to better understand element paths.
+
+### Understanding reader/writer test output
+
+When a test is run, output files are generated beside the input test files. Below is a short description of the content of each file:
+
+* `test_complex_01.json`: input JSON document.
+* `test_complex_01.expected`: flattened JSON element tree used to check if
+ parsing was corrected.
+* `test_complex_01.actual`: flattened JSON element tree produced by `jsontest`
+ from reading `test_complex_01.json`.
+* `test_complex_01.rewrite`: JSON document written by `jsontest` using the
+ `Json::Value` parsed from `test_complex_01.json` and serialized using
+ `Json::StyledWritter`.
+* `test_complex_01.actual-rewrite`: flattened JSON element tree produced by
+ `jsontest` from reading `test_complex_01.rewrite`.
+* `test_complex_01.process-output`: `jsontest` output, typically useful for
+ understanding parsing errors.
+
+## License
+
+See the `LICENSE` file for details. In summary, JsonCpp is licensed under the
+MIT license, or public domain if desired and recognized in your jurisdiction.