You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:41 UTC

[2/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to better abstract deser.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpGetIntegrationTest.cpp b/libminifi/test/integration/HttpGetIntegrationTest.cpp
index ae60dc1..a235759 100644
--- a/libminifi/test/integration/HttpGetIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpGetIntegrationTest.cpp
@@ -26,6 +26,7 @@
 #include <thread>
 #include <type_traits>
 #include <vector>
+#include "../TestServer.h"
 #include "../TestBase.h"
 #include "utils/StringUtils.h"
 #include "core/Core.h"
@@ -41,9 +42,23 @@ void waitToVerifyProcessor() {
   std::this_thread::sleep_for(std::chrono::seconds(10));
 }
 
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_enable(void *ssl_context, void *user_data) {
+  struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+  return 0;
+}
+
 int main(int argc, char **argv) {
-  LogTestController::getInstance().setInfo<minifi::processors::InvokeHTTP>();
-  LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+  init_webserver();
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+  LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
   std::string key_dir, test_file_location;
   if (argc > 1) {
     test_file_location = argv[1];
@@ -59,27 +74,61 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
 
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+  content_repo->initialize(configuration);
+
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME,
-  true);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+                                                                                                content_repo,
+                                                                                                DEFAULT_ROOT_GROUP_NAME,
+                                                                                                true);
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
+  std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke");
+  assert(proc != nullptr);
 
+  std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+  assert(inv != nullptr);
+  std::string url = "";
+  inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  ptr.release();
+  std::string port, scheme, path;
+  parse_http_components(url, port, scheme, path);
+  struct mg_callbacks callback;
+  if (url.find("localhost") != std::string::npos) {
+    if (scheme == "https") {
+      std::string cert = "";
+      cert = key_dir + "nifi-cert.pem";
+      memset(&callback, 0, sizeof(callback));
+      callback.init_ssl = ssl_enable;
+      callback.log_message = log_message;
+      std::cout << cert << std::endl;
+      start_webserver(port, path, "hi this is a get test", &callback, cert);
+    } else {
+      start_webserver(port, path, "hi this is a get test");
+    }
+  }
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
+  if (url.find("localhost") != std::string::npos) {
+    stop_webserver();
+  }
   std::string logs = LogTestController::getInstance().log_output.str();
+
   assert(logs.find("key:filename value:") != std::string::npos);
-  assert(logs.find("key:invokehttp.request.url value:https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c") != std::string::npos);
-  assert(logs.find("Size:3734 Offset:0") != std::string::npos);
+  assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos);
   assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos);
   std::string stringtofind = "Resource Claim created ./content_repository/";
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpPostIntegrationTest.cpp b/libminifi/test/integration/HttpPostIntegrationTest.cpp
index dfa284f..9a46574 100644
--- a/libminifi/test/integration/HttpPostIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpPostIntegrationTest.cpp
@@ -28,6 +28,7 @@
 #include <vector>
 #include "utils/StringUtils.h"
 #include "core/Core.h"
+#include "../TestServer.h"
 #include "../include/core/logging/Logger.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
@@ -42,6 +43,7 @@ void waitToVerifyProcessor() {
 }
 
 int main(int argc, char **argv) {
+  init_webserver();
   LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
   LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
   std::string test_file_location;
@@ -51,7 +53,6 @@ int main(int argc, char **argv) {
   mkdir("/tmp/aljr39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
   std::ofstream myfile;
   myfile.open("/tmp/aljr39/example.txt");
-  myfile << "Hello world" << std::endl;
   myfile.close();
   mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 
@@ -62,31 +63,44 @@ int main(int argc, char **argv) {
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(configuration);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
-  DEFAULT_ROOT_GROUP_NAME,
-                                                                                                true);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+  true);
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
+  std::shared_ptr<core::Processor> proc = ptr->findProcessor("OhJeez");
+  assert(proc != nullptr);
+
+  std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
 
+  assert(inv != nullptr);
+  std::string url = "";
+  inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  ptr.release();
+  std::string port, scheme, path;
+  parse_http_components(url, port, scheme, path);
+  start_webserver(port, path, "hi this is a post test");
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
+  std::string logs = LogTestController::getInstance().log_output.str();
+  // stop webserver
+  stop_webserver();
   assert(LogTestController::getInstance().contains("curl performed") == true);
-  assert(LogTestController::getInstance().contains("Import offset 0 length 12") == true);
+  assert(LogTestController::getInstance().contains("Import offset 0 length 22") == true);
 
   std::string stringtofind = "Resource Claim created ./content_repository/";
 
-  std::string logs = LogTestController::getInstance().log_output.str();
   size_t loc = logs.find(stringtofind);
   while (loc > 0 && loc != std::string::npos) {
     std::string id = logs.substr(loc + stringtofind.size(), 36);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index a7bcc2b..a6dc377 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -53,21 +53,20 @@ int main(int argc, char **argv) {
   LogTestController::getInstance().setDebug<core::ProcessGroup>();
 
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
   std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
   std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
-  DEFAULT_ROOT_GROUP_NAME,
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
                                                                                                 true);
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/Site2SiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp
index 01aa7a8..1773cdb 100644
--- a/libminifi/test/integration/Site2SiteRestTest.cpp
+++ b/libminifi/test/integration/Site2SiteRestTest.cpp
@@ -45,22 +45,22 @@ void waitToVerifyProcessor() {
   std::this_thread::sleep_for(std::chrono::seconds(10));
 }
 
-class ConfigHandler: public CivetHandler {
+class ConfigHandler : public CivetHandler {
  public:
   bool handleGet(CivetServer *server, struct mg_connection *conn) {
     static const std::string site2site_rest_resp = "{"
-           "\"revision\": {"
-           "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
-           "},"
-           "\"controller\": {"
-           "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
-           "\"name\": \"NiFi Flow\","
-           "\"remoteSiteListeningPort\": 10001,"
-           "\"siteToSiteSecure\": false"
-           "}}";
+        "\"revision\": {"
+        "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+        "},"
+        "\"controller\": {"
+        "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+        "\"name\": \"NiFi Flow\","
+        "\"remoteSiteListeningPort\": 10001,"
+        "\"siteToSiteSecure\": false"
+        "}}";
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-        "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-        site2site_rest_resp.length());
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
     mg_printf(conn, "%s", site2site_rest_resp.c_str());
     return true;
   }
@@ -71,7 +71,7 @@ int main(int argc, char **argv) {
   LogTestController::getInstance().setInfo<minifi::FlowController>();
 
   const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 };
-  std::vector < std::string > cpp_options;
+  std::vector<std::string> cpp_options;
   for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
     cpp_options.push_back(options[i]);
   }
@@ -106,28 +106,31 @@ int main(int argc, char **argv) {
       TestFlowRepository>();
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file,
-      test_file_location);
+                     test_file_location);
 
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
-      < minifi::io::StreamFactory > (configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
-      < core::YamlConfiguration
-      > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
-          configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
-      < TestRepository > (test_repo);
+      <minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
 
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared < minifi::FlowController
-          > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true);
+  content_repo->initialize(configuration);
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
-      configuration, test_file_location);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+                                                                                                content_repo,
+                                                                                                DEFAULT_ROOT_GROUP_NAME,
+                                                                                                true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory,
+                                      configuration,
+                                      test_file_location);
 
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-      test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
-      > (ptr.get());
+                                                                test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
+      >(ptr.get());
   ptr.release();
 
   controller->load();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/TestExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/TestExecuteProcess.cpp b/libminifi/test/integration/TestExecuteProcess.cpp
index ef0d113..5506c32 100644
--- a/libminifi/test/integration/TestExecuteProcess.cpp
+++ b/libminifi/test/integration/TestExecuteProcess.cpp
@@ -27,7 +27,7 @@
 #include <memory>
 #include <vector>
 #include <fstream>
-
+#include "core/repository/VolatileContentRepository.h"
 #include "../unit/ProvenanceTestHelper.h"
 #include "FlowController.h"
 #include "processors/GetFile.h"
@@ -47,15 +47,17 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
   processor->setMaxConcurrentTasks(1);
 
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo, content_repo);
 
   uuid_t processoruuid;
   assert(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "executeProcessConnection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "executeProcessConnection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -79,7 +81,7 @@ int main(int argc, char **argv) {
 
   core::ProcessorNode node2(processor);
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo);
+  std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo, test_repo);
   core::ProcessSessionFactory factory(contextset.get());
   processor->onSchedule(contextset.get(), &factory);
 
@@ -87,7 +89,7 @@ int main(int argc, char **argv) {
     processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() {
       core::ProcessorNode node(processor);
       std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-      std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo);
+      std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo, test_repo);
       context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5");
       std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context.get());
       while (!is_ready.load(std::memory_order_relaxed)) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPGet.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGet.yml b/libminifi/test/resources/TestHTTPGet.yml
index 2f64f2a..58f95d9 100644
--- a/libminifi/test/resources/TestHTTPGet.yml
+++ b/libminifi/test/resources/TestHTTPGet.yml
@@ -32,7 +32,7 @@ Processors:
       auto-terminated relationships list:
       Properties:
           HTTP Method: GET
-          Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c
+          Remote URL: http://localhost:10003/geturl
     - name: OhJeez
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.LogAttribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPGetSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml
index f3a23e5..9d19632 100644
--- a/libminifi/test/resources/TestHTTPGetSecure.yml
+++ b/libminifi/test/resources/TestHTTPGetSecure.yml
@@ -33,7 +33,7 @@ Processors:
       Properties:
           SSL Context Service: SSLContextService
           HTTP Method: GET
-          Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c
+          Remote URL: https://raw.githubusercontent.com/apache/nifi-minifi-cpp/master/docs/minifi-logo.png
     - name: OhJeez
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.LogAttribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPPost.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml
index 837194d..c76069a 100644
--- a/libminifi/test/resources/TestHTTPPost.yml
+++ b/libminifi/test/resources/TestHTTPPost.yml
@@ -46,7 +46,7 @@ Processors:
       auto-terminated relationships list: response
       Properties:
           HTTP Method: POST
-          Remote URL: http://requestb.in/u8ax9uu8
+          Remote URL: http://localhost:10003/urlofchampions
           
     - name: Loggit
       id: 2438e3c8-015a-1000-79ca-83af40ec1993

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/cn.ckey.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/cn.ckey.pem b/libminifi/test/resources/cn.ckey.pem
index 23017fa..fc42f06 100644
--- a/libminifi/test/resources/cn.ckey.pem
+++ b/libminifi/test/resources/cn.ckey.pem
@@ -1,5 +1,4 @@
 Bag Attributes
-    friendlyName: nifi-key
     localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C 
 Key Attributes: <No Attributes>
 -----BEGIN RSA PRIVATE KEY-----

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/cn.crt.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/cn.crt.pem b/libminifi/test/resources/cn.crt.pem
index 3a786db..60a38ac 100644
--- a/libminifi/test/resources/cn.crt.pem
+++ b/libminifi/test/resources/cn.crt.pem
@@ -1,5 +1,4 @@
 Bag Attributes
-    friendlyName: nifi-key
     localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C 
 subject=/OU=NIFI/CN=test
 issuer=/OU=NIFI/CN=localhost

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/nifi-cert.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/nifi-cert.pem b/libminifi/test/resources/nifi-cert.pem
index 4e404cd..0c3b7da 100644
--- a/libminifi/test/resources/nifi-cert.pem
+++ b/libminifi/test/resources/nifi-cert.pem
@@ -18,3 +18,30 @@ lvrRtWOqyGHiRoaRE5+VUjyO+0ToEgj9E+3rV8JL66BT7SWQusLGqbX1OoANCMTj
 BRYeqB0g0PrXU+6chh6StpNSnYzkQdoxLUIDYYZx2XGsbkjDh/k6ni6bgJEKEOCu
 T3Z2tyvGpc+PjLRXW/WyXCpg/xfr3+GSVKI6ark=
 -----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAwCF6Tchue7tR66BPg886WOYNPgSwNaq1KJQSuGcEHK2wlAEu
+YfiYz9LbjFLZRLRY2CF9mIGb683byrnvOMcq6a+YdXDaOHZnkKBSsI/xTzScXTv3
+EKSueZ0sMuD7L0y/2Cs2lf8heBUEUqmNe15J9yvEQ1GpJ0j7iCCneKYjjezFWglR
+Sv/9suvqVCxIxr4j9gXODgyU3wdwIxkQUBJXk4GtDp03Rxcx6Ch0VBwjcGkYHhcs
+GHRzg6dcr795tLfOQNA/Vlje0+RtH/KU/WXgzl9nKtxD7XUwZyhoElzNcehN0WmK
+DgAmASncvy7+YYzKU69H14Q+2n/apdoqx/kTQQIDAQABAoIBAQCz7eY69+y4BXo3
+nz84Ipby8CcQoJVg/QiBAwLxHNCWBvdp9B069PQvFLo1FNWSaQ8XAW48p4yc7YHb
+vftRgfwnMyIlQdWrsP9WSz6FSZhkY9HX4rODK6aWD+J3l4jFCCxVxkpteKwgaBZP
+T6hHE8tTJfK8VLqEJu4g0uvjqjt7ydJT69lThdyf3VE0v6ZeSjsya5qqw+9RK+uC
+q5T/8FxeFZgpfR6UXXnoLAmAkfcMZNIBo6cOJWi/BQHjZdpCOVXUBtu0/lC8bffa
+4/ESaxRS8kOp+WEb64pT7u6F7yhD/kve6ZnJj/SX1EvN+RzB3zoVG42WUs/+/SwN
+dU1ERz+tAoGBAPbgZPDnWuKxW7Cam/Aqmvux624C1lNfhfXEGURhyc+wHWjjhWRe
+2vEPJOVxG5pN/FAo+lFoGiLe3QsLRLPlQrGfT/92W28QEcRrRSutjRZOL3wKezQA
+DkAPU9HX3lACR5yQD6+a0HHgMr1MqeNFPi9MPPjywGywTyWzHd4WQqvTAoGBAMc7
+J4fpr5uPVq9mKemK67i7meJ8AxjjU7oNe8EN+2XfCYcQUmgIo+dLzV9+DTrYkoTz
+iqjA6Ph2DNs6YHI/JNwsdSbAz6KVDteimt3t+uyNpiMGuyLmfOgpYEMJcHp+q6I6
+7PGKVS4c5iPFiYuIo23Is9ZMxOVQp76+UOy09rwbAoGBAOM5Za7VQjGkTGAf7ab/
+j+ZZu/dlZR8XrJSoCRmHZ9hgoLEJuJzJMXruFWeY028SmEivbrW+u0+dEJY5qOJr
+ARe7KkZXCZEPmUrP8Lpi4pjFHa9tdjhGVNdhRCTAKz442vCfJ9DZDUHCuPDCvxsP
+gEzIPtZjl/hxzmdElRj0JClBAoGAaXmfzAyjs6+HLQThW4r4kKyBI66T1TFEulM5
+GVPVrHEQEjlJ51nrrCAtckjBqE3QBCMLXZwDusaEt+uH8/QKB6Zhv0qEooZXfUHQ
+y32aQnIbap+9oxRzPFXraJIuwisdop2fo6Cgx/D0xitmTkDghNaknue1tdGlfQ40
+uZx0o9ECgYBeKeNbMnWoO46ZOrhaz8On+fIY7xtboV2bALy7lvUbWd9B41ntqYUm
+NHlYXDDU+Izs5wnNJnNnx4vECuUzYbpeY82dvMewlQwfl5aiyKrjo7VxLm//2U/K
+hlID6DU5wi9O+TAQ319DhxT7Ja+AQxO/OFS/mfrtwJEevxXqJLu55Q==
+-----END RSA PRIVATE KEY-----

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/FileStreamTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
new file mode 100644
index 0000000..5c86f19
--- /dev/null
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -0,0 +1,210 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include "io/FileStream.h"
+#include <string>
+#include <vector>
+#include <uuid/uuid.h>
+#include "../TestBase.h"
+
+TEST_CASE("TestFileOverWrite", "[TestFiles]") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::io::FileStream stream(path, 0, true);
+  std::vector<uint8_t> readBuffer;
+  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+  uint8_t* data = readBuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+  stream.seek(4);
+
+  stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 4);
+
+  stream.seek(0);
+
+  std::vector<uint8_t> verifybuffer;
+
+  REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+  data = verifybuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempfile");
+
+  unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::io::FileStream stream(path, 0, true);
+  std::vector<uint8_t> readBuffer;
+  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+  uint8_t* data = readBuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+  stream.seek(4);
+
+  stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 0);
+
+  stream.seek(0);
+
+  std::vector<uint8_t> verifybuffer;
+
+  REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+  data = verifybuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+  unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::io::FileStream stream(path, 0, true);
+  std::vector<uint8_t> readBuffer;
+  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+  uint8_t* data = readBuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+  stream.seek(4);
+
+  stream.write(nullptr, 0);
+
+  stream.seek(0);
+
+  std::vector<uint8_t> verifybuffer;
+
+  REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+  data = verifybuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+  unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::io::FileStream stream(path, 0, true);
+  std::vector<uint8_t> readBuffer;
+  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+  uint8_t* data = readBuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+  stream.seek(4);
+
+  stream.write(nullptr, 0);
+
+  stream.seek(0);
+
+  std::vector<uint8_t> verifybuffer;
+
+  REQUIRE(stream.readData(nullptr, stream.getSize()) == -1);
+
+  data = verifybuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+
+  unlink(ss.str().c_str());
+}
+
+
+TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  std::string path = ss.str();
+  file.open(path, std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  minifi::io::FileStream stream(path, 0, true);
+  std::vector<uint8_t> readBuffer;
+  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+  uint8_t* data = readBuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+  stream.seek(0);
+
+  std::vector<uint8_t> verifybuffer;
+
+  REQUIRE(stream.readData(verifybuffer, 8192) == 8);
+
+  data = verifybuffer.data();
+
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+  unlink(ss.str().c_str());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp
index 705ac84..2ef3c17 100644
--- a/libminifi/test/unit/InvokeHTTPTests.cpp
+++ b/libminifi/test/unit/InvokeHTTPTests.cpp
@@ -25,6 +25,7 @@
 #include <string>
 #include <set>
 #include "FlowController.h"
+#include "io/BaseStream.h"
 #include "../TestBase.h"
 #include "processors/GetFile.h"
 #include "core/Core.h"
@@ -35,105 +36,9 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessorNode.h"
 
-TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
-  TestController testController;
-  LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
-  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
-
-  std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  uuid_t invokehttp_uuid;
-  REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
-
-  connection2->setRelationship(core::Relationship("No Retry", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(invokehttp);
-
-  connection2->setSource(invokehttp);
-
-  connection2->setSourceUUID(invokehttp_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(invokehttp_uuid);
-
-  processor->addConnection(connection);
-  invokehttp->addConnection(connection);
-  invokehttp->addConnection(connection2);
-
-  core::ProcessorNode node(processor);
-  core::ProcessorNode node2(invokehttp);
-
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, repo);
-  core::ProcessContext context2(node2, controller_services_provider, repo);
-  context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8685");
-  context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
-
-  context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
-  context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8685/testytesttest");
-  core::ProcessSession session(&context);
-  core::ProcessSession session2(&context2);
-
-  REQUIRE(processor->getName() == "listenhttp");
-
-  core::ProcessSessionFactory factory(&context);
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onSchedule(&context, &factory);
-  processor->onTrigger(&context, &session);
-
-  invokehttp->incrementActiveTasks();
-  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
-  core::ProcessSessionFactory factory2(&context2);
-  invokehttp->onSchedule(&context2, &factory2);
-  invokehttp->onTrigger(&context2, &session2);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-
-  reporter = session.getProvenanceReporter();
-
-  records = reporter->getEvents();
-  session.commit();
-
-  invokehttp->incrementActiveTasks();
-  invokehttp->setScheduledState(core::ScheduledState::RUNNING);
-  invokehttp->onTrigger(&context2, &session2);
-
-  session2.commit();
-  records = reporter->getEvents();
-
-  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == processor->getName());
-  }
-  std::shared_ptr<core::FlowFile> ffr = session2.get();
-  REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
-  LogTestController::getInstance().reset();
-}
-
 TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   TestController testController;
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
 
   std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
@@ -154,16 +59,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   uuid_t invokehttp_uuid;
   REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
 
-  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
   gcConnection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute");
+  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
   laConnection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
 
   connection2->setRelationship(core::Relationship("No Retry", "description"));
 
@@ -181,8 +86,8 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   core::ProcessorNode node(listenhttp);
   core::ProcessorNode node2(invokehttp);
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, repo);
-  core::ProcessContext context2(node2, controller_services_provider, repo);
+  core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+  core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo);
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
 
@@ -242,9 +147,10 @@ class CallBack : public minifi::OutputStreamCallback {
   }
   virtual ~CallBack() {
   }
-  virtual void process(std::ofstream *stream) {
+  virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+    // leaving the typo for posterity sake
     std::string st = "we're gnna write some test stuff";
-    stream->write(st.c_str(), st.length());
+    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length());
   }
 };
 
@@ -270,16 +176,18 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   uuid_t invokehttp_uuid;
   REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
 
-  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
   gcConnection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute");
+  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
   laConnection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
 
   connection2->setRelationship(core::Relationship("No Retry", "description"));
 
@@ -299,8 +207,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   core::ProcessorNode node(invokehttp);
   core::ProcessorNode node2(listenhttp);
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, repo);
-  core::ProcessContext context2(node2, controller_services_provider, repo);
+  core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+  core::ProcessContext context2(node2, controller_services_provider, repo, repo,  content_repo);
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680");
   context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
 
@@ -317,14 +225,9 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
 
   CallBack callback;
 
-  /*
-   explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
-   std::map<std::string, std::string> attributes,
-   std::shared_ptr<ResourceClaim> claim = nullptr);
-   */
   std::map<std::string, std::string> attributes;
   attributes["testy"] = "test";
-  std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, attributes);
+  std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes);
   session2.write(flow, &callback);
 
   invokehttp->incrementActiveTasks();
@@ -368,3 +271,39 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
+  TestController testController;
+  LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
+  LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>();
+  LogTestController::getInstance().setInfo<core::Processor>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false);
+  std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+  REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
+  REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest"));
+
+  REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
+  REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest"));
+  plan->reset();
+  testController.runSession(plan, true);
+
+  std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  plan->reset();
+  testController.runSession(plan, true);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == processor->getName());
+  }
+  std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
+  REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
+  LogTestController::getInstance().reset();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 9e2d50c..7f34ba4 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -23,11 +23,12 @@
 #include <vector>
 #include <set>
 #include <fstream>
-#include "../unit/ProvenanceTestHelper.h"
+
 #include "../TestBase.h"
 #include "processors/ListenHTTP.h"
 #include "processors/LogAttribute.h"
 #include "processors/GetFile.h"
+#include "../unit/ProvenanceTestHelper.h"
 #include "core/Core.h"
 #include "core/FlowFile.h"
 #include "core/Processor.h"
@@ -42,131 +43,12 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   REQUIRE(processor->getName() == "processorname");
 }
 
-TEST_CASE("Test Find file", "[getfileCreate2]") {
-  TestController testController;
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-  std::shared_ptr<org::apache::nifi::minifi::Configure> configure = std::make_shared<org::apache::nifi::minifi::Configure>();
-
-  std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
-      std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(configure), configure);
-
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-  connection->setDestination(processor);
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, test_repo);
-  core::ProcessSessionFactory factory(&context);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
-  core::ProcessSession session(&context);
-
-  processor->onSchedule(&context, &factory);
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << dir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  records = reporter->getEvents();
-
-  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == processor->getName());
-  }
-  session.commit();
-  std::shared_ptr<core::FlowFile> ffr = session.get();
-
-  ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-  REQUIRE(2 == repo->getRepoMap().size());
-
-  for (auto entry : repo->getRepoMap()) {
-    provenance::ProvenanceEventRecord newRecord;
-    newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length());
-
-    bool found = false;
-    for (auto provRec : records) {
-      if (provRec->getEventId() == newRecord.getEventId()) {
-        REQUIRE(provRec->getEventId() == newRecord.getEventId());
-        REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
-        REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
-        REQUIRE(provRec->getDetails() == newRecord.getDetails());
-        REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
-        found = true;
-        break;
-      }
-    }
-    if (!found) {
-      throw std::runtime_error("Did not find record");
-    }
-  }
-
-  core::ProcessorNode nodeReport(processorReport);
-  core::ProcessContext contextReport(nodeReport, controller_services_provider, test_repo);
-  core::ProcessSessionFactory factoryReport(&contextReport);
-  core::ProcessSession sessionReport(&contextReport);
-  processorReport->onSchedule(&contextReport, &factoryReport);
-  std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast<
-      org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport);
-  taskReport->setBatchSize(1);
-  std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport;
-  processorReport->incrementActiveTasks();
-  processorReport->setScheduledState(core::ScheduledState::RUNNING);
-  std::string jsonStr;
-  repo->getProvenanceRecord(recordsReport, 1);
-  taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
-  REQUIRE(recordsReport.size() == 1);
-  REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
-  REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
-}
-
 TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   TestController testController;
-
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
 
   std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
@@ -175,7 +57,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
+
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -190,7 +73,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
 
   core::ProcessorNode node(processor);
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, test_repo);
+  core::ProcessContext context(node, controller_services_provider, test_repo, test_repo);
   core::ProcessSessionFactory factory(&context);
   context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
   // replicate 10 threads
@@ -245,71 +128,59 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   TestController testController;
   LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
 
-  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
 
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
 
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  uuid_t logattribute_uuid;
-  REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "logattribute");
-  connection2->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(logAttribute);
-
-  connection2->setSource(logAttribute);
-
-  connection2->setSourceUUID(logattribute_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(logattribute_uuid);
+  plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  testController.runSession(plan, false);
+  std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
 
-  processor->addConnection(connection);
-  logAttribute->addConnection(connection);
-  logAttribute->addConnection(connection2);
-  REQUIRE(dir != NULL);
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+  plan->reset();
+  testController.runSession(plan, false);
 
-  core::ProcessorNode node(processor);
-  core::ProcessorNode node2(logAttribute);
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-  core::ProcessContext context(node, controller_services_provider, repo);
-  core::ProcessContext context2(node2, controller_services_provider, repo);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
-  core::ProcessSession session(&context);
-  core::ProcessSession session2(&context2);
+  unlink(ss.str().c_str());
 
-  REQUIRE(processor->getName() == "getfileCreate2");
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  testController.runSession(plan, false);
 
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
 
-  core::ProcessSessionFactory factory(&context);
-  processor->onSchedule(&context, &factory);
-  processor->onTrigger(&context, &session);
+  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
+  REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
+  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
+  LogTestController::getInstance().reset();
+}
 
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  core::ProcessSessionFactory factory2(&context2);
-  logAttribute->onSchedule(&context2, &factory2);
-  logAttribute->onTrigger(&context2, &session2);
+TEST_CASE("Test Find file", "[getfileCreate3]") {
+  TestController testController;
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2");
+  std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+      std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>());
+  plan->addProcessor(processorReport, "reporter", core::Relationship("success", "description"), false);
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
 
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
+  plan->setProperty(processor, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  testController.runSession(plan, false);
+  std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
   REQUIRE(record == nullptr);
   REQUIRE(records.size() == 0);
 
@@ -319,26 +190,58 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   file.open(ss.str(), std::ios::out);
   file << "tempFile";
   file.close();
+  plan->reset();
+  testController.runSession(plan, false);
 
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  records = reporter->getEvents();
-  session.commit();
-
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(&context2, &session2);
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == processor->getName());
+  }
+  std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
+  REQUIRE(ffr != nullptr);
+  ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+  auto repo = std::static_pointer_cast<TestRepository>(plan->getProvenanceRepo());
+  REQUIRE(2 == repo->getRepoMap().size());
 
-  records = reporter->getEvents();
+  for (auto entry : repo->getRepoMap()) {
+    provenance::ProvenanceEventRecord newRecord;
+    newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length());
 
-  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
-  REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
-  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
-  LogTestController::getInstance().reset();
+    bool found = false;
+    for (auto provRec : records) {
+      if (provRec->getEventId() == newRecord.getEventId()) {
+        REQUIRE(provRec->getEventId() == newRecord.getEventId());
+        REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
+        REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
+        REQUIRE(provRec->getDetails() == newRecord.getDetails());
+        REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      throw std::runtime_error("Did not find record");
+    }
+  }
+  std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast<
+      org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport);
+  taskReport->setBatchSize(1);
+  std::vector<std::shared_ptr<core::SerializableComponent>> recordsReport;
+  recordsReport.push_back(std::make_shared<provenance::ProvenanceEventRecord>());
+  processorReport->incrementActiveTasks();
+  processorReport->setScheduledState(core::ScheduledState::RUNNING);
+  std::string jsonStr;
+  std::size_t deserialized = 0;
+  repo->DeSerialize(recordsReport, deserialized);
+  std::function<void(core::ProcessContext*, core::ProcessSession*)> verifyReporter = [&](core::ProcessContext *context, core::ProcessSession *session) {
+    taskReport->getJsonReport(context, session, recordsReport, jsonStr);
+    REQUIRE(recordsReport.size() == 1);
+    REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
+    REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
+  };
+
+  testController.runSession(plan, false, verifyReporter);
 }
 
 int fileSize(const char *add) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 17e6078..1b39700 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -18,11 +18,22 @@
 #ifndef LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_
 #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_
 
-#include "provenance/Provenance.h"
-#include "FlowController.h"
-#include "core/Repository.h"
-#include "core/repository/FlowFileRepository.h"
-#include "core/Core.h"
+#include <atomic>
+#include <cstdint>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include "core/repository/VolatileContentRepository.h"
+#include "../../include/core/Processor.h"
+#include "../../include/core/repository/FlowFileRepository.h"
+#include "../../include/Connection.h"
+#include "../../include/FlowController.h"
+#include "../../include/properties/Configure.h"
+#include "../../include/provenance/Provenance.h"
+
 /**
  * Test repository
  */
@@ -41,17 +52,22 @@ class TestRepository : public core::Repository {
 
   }
 
-  bool Put(std::string key, uint8_t *buf, int bufLen) {
+  bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
     return true;
   }
+
+  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+    return Put(key, buffer, bufferSize);
+  }
+
   // Delete
   bool Delete(std::string key) {
     repositoryResults.erase(key);
     return true;
   }
   // Get
-  bool Get(std::string key, std::string &value) {
+  bool Get(const std::string &key, std::string &value) {
     auto result = repositoryResults.find(key);
     if (result != repositoryResults.end()) {
       value = result->second;
@@ -61,6 +77,39 @@ class TestRepository : public core::Repository {
     }
   }
 
+  virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) {
+    return false;
+  }
+
+  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+    max_size = 0;
+    for (auto entry : repositoryResults) {
+      std::shared_ptr<core::SerializableComponent> eventRead = store.at(max_size);
+
+      if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) {
+      }
+      if (+max_size >= store.size()) {
+        break;
+      }
+    }
+    return true;
+  }
+
+  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) {
+    return false;
+  }
+
+  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
+    std::string value;
+    Get(store->getUUIDStr(), value);
+    store->DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(value.c_str())), value.size());
+    return true;
+  }
+
+  virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
+    return false;
+  }
+
   const std::map<std::string, std::string> &getRepoMap() const {
     return repositoryResults;
   }
@@ -134,6 +183,9 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
       }
     }
   }
+  
+  void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+  }
 
   void run() {
     // do nothing
@@ -145,8 +197,8 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
 class TestFlowController : public minifi::FlowController {
 
  public:
-  TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo)
-      : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, "", true) {
+  TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo)
+      : minifi::FlowController(repo, flow_file_repo,std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) {
   }
   ~TestFlowController() {
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp
index 6a58597..97cb646 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -26,8 +26,8 @@
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
 #include "core/Core.h"
-#include "core/repository/FlowFileRepository.h"
-#include "core/repository/VolatileRepository.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/repository/VolatileProvenanceRepository.h"
 
 TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
   provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
@@ -49,7 +49,8 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
 
   record1.Serialize(testRepository);
   provenance::ProvenanceEventRecord record2;
-  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
   REQUIRE(record2.getEventId() == record1.getEventId());
   REQUIRE(record2.getComponentId() == record1.getComponentId());
   REQUIRE(record2.getComponentType() == record1.getComponentType());
@@ -60,12 +61,13 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
 
 TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
   provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::string eventId = record1.getEventId();
   std::map<std::string, std::string> attributes;
   attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
   attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
   std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("ff", "./content_repository", 0, 0, 0);
-  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes);
+  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
 
   record1.addChildFlowFile(ffr1);
 
@@ -75,7 +77,8 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
 
   record1.Serialize(testRepository);
   provenance::ProvenanceEventRecord record2;
-  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
   REQUIRE(record1.getChildrenUuids().size() == 1);
   REQUIRE(record2.getChildrenUuids().size() == 1);
   std::string childId = record2.getChildrenUuids().at(0);
@@ -94,13 +97,14 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
 
   uint64_t sample = 65555;
 
-  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>();
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(0);
   record1.setEventDuration(sample);
 
   record1.Serialize(testRepository);
   provenance::ProvenanceEventRecord record2;
-  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
   REQUIRE(record2.getEventId() == record1.getEventId());
   REQUIRE(record2.getComponentId() == record1.getComponentId());
   REQUIRE(record2.getComponentType() == record1.getComponentType());
@@ -111,24 +115,26 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
 
 TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") {
   provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::string eventId = record1.getEventId();
   std::map<std::string, std::string> attributes;
   attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
   attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
-  std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileRepository>();
+  std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileProvenanceRepository>();
   frepo->initialize(0);
-  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes);
+  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
 
   record1.addChildFlowFile(ffr1);
 
   uint64_t sample = 65555;
-  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>();
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(0);
   record1.setEventDuration(sample);
 
   record1.Serialize(testRepository);
   provenance::ProvenanceEventRecord record2;
-  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == true);
   REQUIRE(record1.getChildrenUuids().size() == 1);
   REQUIRE(record2.getChildrenUuids().size() == 1);
   std::string childId = record2.getChildrenUuids().at(0);
@@ -151,7 +157,8 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena
   testRepository->initialize(0);
   record1.setEventDuration(sample);
 
-  record1.Serialize(testRepository);
+  REQUIRE(record1.Serialize(testRepository) == true);
   provenance::ProvenanceEventRecord record2;
-  REQUIRE(record2.DeSerialize(testRepository, eventId) == false);
+  record2.setEventId(eventId);
+  REQUIRE(record2.DeSerialize(testRepository) == false);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp
index 4424a93..3b18310 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -23,7 +23,7 @@
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
 #include "core/Core.h"
-#include "core/repository/FlowFileRepository.h"
+#include "../../include/core/repository/AtomicRepoEntries.h"
 #include "properties/Configure.h"
 
 TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
@@ -34,7 +34,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
 
   repository->initialize(std::make_shared<minifi::Configure>());
 
-  minifi::FlowFileRecord record(repository);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
 
   record.addAttribute("keyA", "");
 
@@ -50,8 +51,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
   std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
 
   repository->initialize(std::make_shared<minifi::Configure>());
-
-  minifi::FlowFileRecord record(repository);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
 
   record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
 
@@ -70,9 +71,10 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
 
   repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
 
-  minifi::FlowFileRecord record(repository);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  minifi::FlowFileRecord record(repository, content_repo);
 
-  minifi::FlowFileRecord record2(repository);
+  minifi::FlowFileRecord record2(repository, content_repo);
 
   std::string uuid = record.getUUIDStr();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/TailFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
index e800b4c..eb33f8c 100644
--- a/libminifi/test/unit/TailFileTests.cpp
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -42,130 +42,137 @@ static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
 static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
 
 TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
-    try {
-        // Create and write to the test file
-        std::ofstream tmpfile;
-        tmpfile.open(TMP_FILE);
-        tmpfile << NEWLINE_FILE;
-        tmpfile.close();
+  try {
+    // Create and write to the test file
+    std::ofstream tmpfile;
+    tmpfile.open(TMP_FILE);
+    tmpfile << NEWLINE_FILE;
+    tmpfile.close();
 
-        TestController testController;
-        LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+    TestController testController;
+    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
 
-        std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-        std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
-        std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-        uuid_t processoruuid;
-        REQUIRE(true == processor->getUUID(processoruuid));
-        uuid_t logAttributeuuid;
-        REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
 
-        std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
-        connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "TailFile successful output"));
 
-        // link the connections so that we can test results at the end for this
-        connection->setDestination(connection);
+    // link the connections so that we can test results at the end for this
+    connection->setDestination(connection);
 
-        connection->setSourceUUID(processoruuid);
+    connection->setSourceUUID(processoruuid);
 
-        processor->addConnection(connection);
+    processor->addConnection(connection);
 
-        core::ProcessorNode node(processor);
+    core::ProcessorNode node(processor);
 
-        std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-        core::ProcessContext context(node, controller_services_provider, repo);
-        context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
-        context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
-        context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+    context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+    context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
 
-        core::ProcessSession session(&context);
+    core::ProcessSession session(&context);
 
-        REQUIRE(processor->getName() == "tailfile");
+    REQUIRE(processor->getName() == "tailfile");
 
-        core::ProcessSessionFactory factory(&context);
+    core::ProcessSessionFactory factory(&context);
 
-        std::shared_ptr<core::FlowFile> record;
-        processor->setScheduledState(core::ScheduledState::RUNNING);
-        processor->onSchedule(&context, &factory);
-        processor->onTrigger(&context, &session);
+    std::shared_ptr<core::FlowFile> record;
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    processor->onSchedule(&context, &factory);
+    processor->onTrigger(&context, &session);
 
-        provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-        std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
-        record = session.get();
-        REQUIRE(record == nullptr);
-        std::shared_ptr<core::FlowFile> ff = session.get();
-        REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
+    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+    std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+    record = session.get();
+    REQUIRE(record == nullptr);
+    std::shared_ptr<core::FlowFile> ff = session.get();
+    REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
 
-        LogTestController::getInstance().reset();
-    } catch (...) { }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
 
-    // Delete the test and state file.
-    std::remove(TMP_FILE);
-    std::remove(STATE_FILE);
+  // Delete the test and state file.
+  std::remove(TMP_FILE);
+  std::remove(STATE_FILE);
 }
 
-
 TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
-    try {
-        // Create and write to the test file
-        std::ofstream tmpfile;
-        tmpfile.open(TMP_FILE);
-        tmpfile << NEWLINE_FILE;
-        tmpfile.close();
+  try {
+    // Create and write to the test file
+    std::ofstream tmpfile;
+    tmpfile.open(TMP_FILE);
+    tmpfile << NEWLINE_FILE;
+    tmpfile.close();
 
-        TestController testController;
-        LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+    TestController testController;
+    LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
 
-        std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-        std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
-        std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-        uuid_t processoruuid;
-        REQUIRE(true == processor->getUUID(processoruuid));
-        uuid_t logAttributeuuid;
-        REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
 
-        std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
-        connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("success", "TailFile successful output"));
 
-        // link the connections so that we can test results at the end for this
-        connection->setDestination(connection);
-        connection->setSourceUUID(processoruuid);
+    // link the connections so that we can test results at the end for this
+    connection->setDestination(connection);
+    connection->setSourceUUID(processoruuid);
 
-        processor->addConnection(connection);
+    processor->addConnection(connection);
 
-        core::ProcessorNode node(processor);
+    core::ProcessorNode node(processor);
 
-        std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-        core::ProcessContext context(node, controller_services_provider, repo);
-        context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
-        context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+    context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
 
-        core::ProcessSession session(&context);
+    core::ProcessSession session(&context);
 
-        REQUIRE(processor->getName() == "tailfile");
+    REQUIRE(processor->getName() == "tailfile");
 
-        core::ProcessSessionFactory factory(&context);
+    core::ProcessSessionFactory factory(&context);
 
-        std::shared_ptr<core::FlowFile> record;
-        processor->setScheduledState(core::ScheduledState::RUNNING);
-        processor->onSchedule(&context, &factory);
-        processor->onTrigger(&context, &session);
+    std::shared_ptr<core::FlowFile> record;
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    processor->onSchedule(&context, &factory);
+    processor->onTrigger(&context, &session);
 
-        provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-        std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
-        record = session.get();
-        REQUIRE(record == nullptr);
-        std::shared_ptr<core::FlowFile> ff = session.get();
-        REQUIRE(provRecords.size() == 2);
+    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+    std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+    record = session.get();
+    REQUIRE(record == nullptr);
+    std::shared_ptr<core::FlowFile> ff = session.get();
+    REQUIRE(provRecords.size() == 2);
 
-        LogTestController::getInstance().reset();
-    } catch (...) { }
+    LogTestController::getInstance().reset();
+  } catch (...) {
+  }
 
-    // Delete the test and state file.
-    std::remove(TMP_FILE);
-    std::remove(STATE_FILE);
+  // Delete the test and state file.
+  std::remove(TMP_FILE);
+  std::remove(STATE_FILE);
 }