You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2018/01/08 18:27:02 UTC

[5/5] nifi-minifi-cpp git commit: MINIFICPP-251 Move Civet implementations to an extension.

MINIFICPP-251 Move Civet implementations to an extension.

This closed #203.

Signed-off-by: Bin Qiu <be...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0981f9ac
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0981f9ac
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0981f9ac

Branch: refs/heads/master
Commit: 0981f9acf9243886015aed503f01a5c3bd300b0c
Parents: fb8573f
Author: Caleb Johnson <me...@calebj.io>
Authored: Thu Nov 16 23:06:22 2017 +0000
Committer: Bin Qiu <be...@gmail.com>
Committed: Mon Jan 8 10:25:00 2018 -0800

----------------------------------------------------------------------
 CMakeLists.txt                                  |  14 +-
 cmake/BuildTests.cmake                          |   4 +-
 .../civet_curl_tests/C2NullConfiguration.cpp    | 137 ++++++++
 extensions/civet_curl_tests/C2UpdateTest.cpp    | 183 ++++++++++
 .../C2VerifyHeartbeatAndStop.cpp                | 156 +++++++++
 .../civet_curl_tests/C2VerifyServeResults.cpp   | 131 ++++++++
 extensions/civet_curl_tests/CMakeLists.txt      |  82 +++++
 .../ControllerServiceIntegrationTests.cpp       | 160 +++++++++
 extensions/civet_curl_tests/GetFileNoData.cpp   | 184 ++++++++++
 .../civet_curl_tests/HTTPSiteToSiteTests.cpp    | 262 +++++++++++++++
 .../civet_curl_tests/HttpGetIntegrationTest.cpp | 162 +++++++++
 .../HttpPostIntegrationTest.cpp                 | 114 +++++++
 .../civet_curl_tests/SiteToSiteRestTest.cpp     | 145 ++++++++
 .../civet_curl_tests/ThreadPoolAdjust.cpp       | 115 +++++++
 .../civet_curl_tests/include/TestServer.h       | 117 +++++++
 .../include/integration/HTTPIntegrationBase.h   |  75 +++++
 .../include/sitetositehttp/HTTPHandlers.h       | 320 ++++++++++++++++++
 extensions/civet_curl_tests/unit/CMakeLists.txt |  76 +++++
 .../civet_curl_tests/unit/InvokeHTTPTests.cpp   | 315 ++++++++++++++++++
 extensions/civetweb/CMakeLists.txt              |  70 ++++
 extensions/civetweb/CivetLoader.cpp             |  29 ++
 extensions/civetweb/ListenHTTP.cpp              | 333 +++++++++++++++++++
 extensions/civetweb/RESTReceiver.cpp            | 147 ++++++++
 extensions/civetweb/include/CivetLoader.h       |  70 ++++
 .../include/c2/protocols/RESTReceiver.h         | 110 ++++++
 extensions/civetweb/include/io/CivetStream.h    | 138 ++++++++
 .../civetweb/include/processors/ListenHTTP.h    | 121 +++++++
 extensions/http-curl/CMakeLists.txt             |   4 +-
 extensions/http-curl/HTTPCurlLoader.h           |  10 +-
 extensions/http-curl/RESTSender.cpp             | 140 ++++++++
 extensions/http-curl/c2/protocols/RESTSender.h  |  80 +++++
 extensions/http-curl/protocols/RESTProtocol.cpp | 177 ----------
 extensions/http-curl/protocols/RESTProtocol.h   |  75 -----
 extensions/http-curl/protocols/RESTReceiver.cpp | 147 --------
 extensions/http-curl/protocols/RESTReceiver.h   | 111 -------
 extensions/http-curl/protocols/RESTSender.cpp   | 140 --------
 extensions/http-curl/protocols/RESTSender.h     |  81 -----
 extensions/libarchive/CMakeLists.txt            |   2 +-
 extensions/rocksdb-repos/CMakeLists.txt         |   2 +-
 extensions/script/CMakeLists.txt                |   2 +-
 libminifi/CMakeLists.txt                        |   3 +-
 libminifi/include/c2/protocols/RESTProtocol.h   |  74 +++++
 libminifi/include/core/FlowConfiguration.h      |   1 +
 libminifi/include/processors/ListenHTTP.h       | 121 -------
 libminifi/src/c2/protocols/RESTProtocol.cpp     | 177 ++++++++++
 libminifi/src/processors/ListenHTTP.cpp         | 333 -------------------
 libminifi/test/TestServer.h                     | 117 -------
 .../test/curl-tests/C2NullConfiguration.cpp     | 135 --------
 libminifi/test/curl-tests/C2UpdateTest.cpp      | 183 ----------
 .../curl-tests/C2VerifyHeartbeatAndStop.cpp     | 155 ---------
 .../test/curl-tests/C2VerifyServeResults.cpp    | 131 --------
 libminifi/test/curl-tests/CMakeLists.txt        |  76 -----
 .../ControllerServiceIntegrationTests.cpp       | 160 ---------
 libminifi/test/curl-tests/GetFileNoData.cpp     | 184 ----------
 .../test/curl-tests/HTTPSiteToSiteTests.cpp     | 262 ---------------
 .../test/curl-tests/HttpGetIntegrationTest.cpp  | 162 ---------
 .../test/curl-tests/HttpPostIntegrationTest.cpp | 114 -------
 .../test/curl-tests/SiteToSiteRestTest.cpp      | 145 --------
 libminifi/test/curl-tests/ThreadPoolAdjust.cpp  | 114 -------
 .../curl-tests/sitetositehttp/CivetStream.h     | 138 --------
 .../curl-tests/sitetositehttp/HTTPHandlers.h    | 321 ------------------
 .../test/curl-tests/unit/InvokeHTTPTests.cpp    | 314 -----------------
 libminifi/test/integration/IntegrationBase.h    |  55 +--
 libminifi/test/pcap-tests/CMakeLists.txt        |   1 +
 libminifi/test/unit/ClassLoaderTests.cpp        |   1 -
 libminifi/test/unit/GetTCPTests.cpp             |   1 -
 libminifi/test/unit/ProcessorTests.cpp          |   1 -
 libminifi/test/unit/PutFileTests.cpp            |   1 -
 main/CMakeLists.txt                             |   2 +-
 69 files changed, 4254 insertions(+), 3969 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c7c4d12..e6f89a8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -116,10 +116,10 @@ set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
 SET(WITH_TOOLS OFF CACHE BOOL "Do not build RocksDB tools")
 SET(WITH_TESTS OFF CACHE BOOL "Build RocksDB library (not repo) tests")
 set(CIVET_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb root")
+set(CIVET_BINARY_ROOT "${CMAKE_BINARY_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb binary output")
 set(ROCKSDB_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/rocksdb/" CACHE STRING "Path to RocksDB root")
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-20171024)
-set(BUILD_CIVET_TESTING OFF)
-add_subdirectory("${CIVET_THIRDPARTY_ROOT}" EXCLUDE_FROM_ALL)
+
 include_directories(thirdparty/concurrentqueue)
 include_directories(thirdparty/yaml-cpp-yaml-cpp-20171024/include)
 
@@ -140,7 +140,12 @@ add_subdirectory(libminifi)
 #### EXTENSION
 option(DISABLE_CURL "Disables libCurl Properties." OFF)
 if (NOT DISABLE_CURL)
-	createExtension(HTTP-CURL "HTTP CURL" "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site" "extensions/http-curl" "${TEST_DIR}/curl-tests")
+	createExtension(HTTP-CURL "HTTP CURL" "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site" "extensions/http-curl")
+endif()
+
+option(DISABLE_CIVET "Disables CivetWeb components." OFF)
+if (NOT DISABLE_CIVET)
+createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP and several cURL tests" "extensions/civetweb")
 endif()
 
 ## Add the rocks DB extension
@@ -250,6 +255,9 @@ include(CPack)
 
 if (NOT SKIP_TESTS)
 	include(BuildTests)
+    if (NOT (DISABLE_CURL OR DISABLE_CIVET))
+            add_subdirectory(extensions/civet_curl_tests)
+    endif()
 endif()
 
 include(BuildDocs)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 4499305..1f6fc8a 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -39,7 +39,7 @@ function(createTests testName)
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/jsoncpp/include")
-    target_include_directories(${testName} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/libarchive-3.3.2/libarchive")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/")
@@ -59,7 +59,7 @@ function(createTests testName)
         target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}")
     endif()
     target_link_libraries(${testName} ${SPD_LIB} ${TEST_BASE_LIB})
-    target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB} dl)
+    target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp ${JSON_CPP_LIB})
     if (APPLE)
 		target_link_libraries (${testName} -Wl,-all_load minifi)
 	else ()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/C2NullConfiguration.cpp b/extensions/civet_curl_tests/C2NullConfiguration.cpp
new file mode 100644
index 0000000..c68e047
--- /dev/null
+++ b/extensions/civet_curl_tests/C2NullConfiguration.cpp
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "c2/protocols/RESTReceiver.h"
+#include "c2/protocols/RESTSender.h"
+#include "c2/C2Agent.h"
+#include "c2/protocols/RESTReceiver.h"
+#include "processors/LogAttribute.h"
+#include "integration/HTTPIntegrationBase.h"
+
+class VerifyC2Server : public HTTPIntegrationBase {
+ public:
+  explicit VerifyC2Server(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+    LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true);
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
+    assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
+  }
+
+  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+
+    std::string port, scheme, path;
+    parse_http_components(url, port, scheme, path);
+    configuration->set("c2.agent.protocol.class", "null");
+    configuration->set("c2.rest.url", "");
+    configuration->set("c2.rest.url.ack", "");
+    configuration->set("c2.agent.heartbeat.reporter.classes", "null");
+    configuration->set("c2.rest.listener.port", "null");
+    configuration->set("c2.agent.heartbeat.period", "null");
+    configuration->set("c2.rest.listener.heartbeat.rooturi", "null");
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Server harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/C2UpdateTest.cpp b/extensions/civet_curl_tests/C2UpdateTest.cpp
new file mode 100644
index 0000000..edc92f0
--- /dev/null
+++ b/extensions/civet_curl_tests/C2UpdateTest.cpp
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "c2/protocols/RESTSender.h"
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+static std::vector<std::string> responses;
+
+class ConfigHandler : public CivetHandler {
+ public:
+  ConfigHandler() {
+    calls_ = 0;
+  }
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    calls_++;
+    if (responses.size() > 0) {
+      std::string top_str = responses.back();
+      responses.pop_back();
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                top_str.length());
+      mg_printf(conn, "%s", top_str.c_str());
+    } else {
+      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+    }
+
+    return true;
+  }
+
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::ifstream myfile(test_file_location_.c_str());
+
+    if (myfile.is_open()) {
+      std::stringstream buffer;
+      buffer << myfile.rdbuf();
+      std::string str = buffer.str();
+      myfile.close();
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                str.length());
+      mg_printf(conn, "%s", str.c_str());
+    } else {
+      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+    }
+
+    return true;
+  }
+  std::string test_file_location_;
+  std::atomic<size_t> calls_;
+};
+
+int main(int argc, char **argv) {
+  mg_init_library(0);
+  LogTestController::getInstance().setInfo<minifi::FlowController>();
+  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+  LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+
+  const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+
+  CivetServer server(cpp_options);
+  ConfigHandler h_ex;
+  server.addHandler("/update", h_ex);
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    h_ex.test_file_location_ = test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+  std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+      "\"operation\" : \"update\", "
+      "\"operationid\" : \"8675309\", "
+      "\"name\": \"configuration\""
+      "}]}";
+
+  responses.push_back(heartbeat_response);
+
+  std::ifstream myfile(test_file_location.c_str());
+
+  if (myfile.is_open()) {
+    std::stringstream buffer;
+    buffer << myfile.rdbuf();
+    std::string str = buffer.str();
+    myfile.close();
+    std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+        "\"operation\" : \"update\", "
+        "\"operationid\" : \"8675309\", "
+        "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
+    responses.push_back(response);
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+
+  configuration->set("c2.rest.url", "http://localhost:9090/update");
+  configuration->set("c2.agent.heartbeat.period", "1000");
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+  true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+  ptr.release();
+  auto start = std::chrono::system_clock::now();
+
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  auto then = std::chrono::system_clock::now();
+
+  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
+  std::string logs = LogTestController::getInstance().log_output.str();
+  assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
+  LogTestController::getInstance().reset();
+  rmdir("./content_repository");
+  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp b/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp
new file mode 100644
index 0000000..f447d69
--- /dev/null
+++ b/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "c2/C2Agent.h"
+#include "c2/protocols/RESTReceiver.h"
+#include "c2/protocols/RESTSender.h"
+#include "integration/HTTPIntegrationBase.h"
+#include "processors/LogAttribute.h"
+
+class Responder : public CivetHandler {
+ public:
+  explicit Responder(bool isSecure)
+      : isSecure(isSecure) {
+  }
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    std::string resp =
+        "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              resp.length());
+    mg_printf(conn, "%s", resp.c_str());
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
+class VerifyC2Heartbeat : public HTTPIntegrationBase {
+ public:
+  explicit VerifyC2Heartbeat(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<LogTestController>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    LogTestController::getInstance().reset();
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
+
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
+
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
+  }
+
+  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+    configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat");
+    configuration->set("c2.agent.heartbeat.period", "1000");
+    configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat");
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:8888/api/heartbeat";
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Heartbeat harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  Responder responder(isSecure);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/C2VerifyServeResults.cpp b/extensions/civet_curl_tests/C2VerifyServeResults.cpp
new file mode 100644
index 0000000..d99a2d8
--- /dev/null
+++ b/extensions/civet_curl_tests/C2VerifyServeResults.cpp
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "processors/InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "c2/C2Agent.h"
+#include "c2/protocols/RESTReceiver.h"
+#include "integration/HTTPIntegrationBase.h"
+#include "processors/LogAttribute.h"
+
+class VerifyC2Server : public HTTPIntegrationBase {
+ public:
+  explicit VerifyC2Server(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+      LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("Import offset 0") == true);
+
+    assert(LogTestController::getInstance().contains("Outputting success and response") == true);
+  }
+
+  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+
+    std::string port, scheme, path;
+    parse_http_components(url, port, scheme, path);
+    configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
+    configuration->set("c2.rest.listener.port", port);
+    configuration->set("c2.agent.heartbeat.period", "10");
+    configuration->set("c2.rest.listener.heartbeat.rooturi", path);
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Server harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/CMakeLists.txt b/extensions/civet_curl_tests/CMakeLists.txt
new file mode 100644
index 0000000..727c0ab
--- /dev/null
+++ b/extensions/civet_curl_tests/CMakeLists.txt
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB CURL_UNIT_TESTS  "unit/*.cpp")
+file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
+
+SET(CURL_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${CURL_UNIT_TESTS})
+  	get_filename_component(testfilename "${testfile}" NAME_WE)
+  	add_executable("${testfilename}" "${testfile}")
+  	target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+  	target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/client/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/processors/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/protocols/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/sitetosite/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../civetweb/include/")
+	target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    if (APPLE)
+    	target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions)
+	else ()
+  		target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive)
+  	endif()
+  MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+FOREACH(testfile ${CURL_INTEGRATION_TESTS})
+  	get_filename_component(testfilename "${testfile}" NAME_WE)
+  	add_executable("${testfilename}" "${testfile}")
+  	target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+  	target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/client/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/processors/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/protocols/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/sitetosite/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../civetweb/include/")
+	target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+    createTests("${testfilename}")
+    if (APPLE)
+    	target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions)
+	else ()
+  		target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive)
+  	endif()
+  MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...")
+
+add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
+add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/")
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
+add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
+add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp b/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp
new file mode 100644
index 0000000..612603a
--- /dev/null
+++ b/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <thread>
+#include <type_traits>
+#include <vector>
+
+#include "core/controller/ControllerServiceMap.h"
+#include "core/controller/StandardControllerServiceNode.h"
+#include "core/controller/StandardControllerServiceProvider.h"
+#include "controllers/SSLContextService.h"
+#include "core/Core.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessGroup.h"
+#include "core/Resource.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/MockClasses.h"
+#include "unit/ProvenanceTestHelper.h"
+
+REGISTER_RESOURCE(MockControllerService);
+REGISTER_RESOURCE(MockProcessor);
+
+std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) {
+  std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>();
+  std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id,
+                                                                                                                                                std::make_shared<minifi::Configure>());
+  return testNode;
+}
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+}
+
+int main(int argc, char **argv) {
+  std::string test_file_location;
+  std::string key_dir;
+
+  if (argc > 2) {
+    test_file_location = argv[1];
+    key_dir = argv[1];
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+
+  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+  std::string client_cert = "cn.crt.pem";
+  std::string priv_key_file = "cn.ckey.pem";
+  std::string passphrase = "cn.pass";
+  std::string ca_cert = "nifi-cert.pem";
+  configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location);
+  configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file);
+  configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase);
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(configuration);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+                                                                                                content_repo,
+                                                                                                DEFAULT_ROOT_GROUP_NAME,
+                                                                                                true);
+
+  disabled = false;
+  std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+  ptr.release();
+
+  std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>());
+  std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
+  assert(mockNode != nullptr);
+  mockNode->enable();
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode> > linkedNodes = mockNode->getLinkedControllerServices();
+  assert(linkedNodes.size() == 1);
+
+  std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong");
+  assert(notexistNode == nullptr);
+
+  std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr;
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr;
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->load();
+    controller->start();
+    ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest");
+    ssl_client_cont->enable();
+    assert(ssl_client_cont != nullptr);
+    assert(ssl_client_cont->getControllerServiceImplementation() != nullptr);
+    ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation());
+  }
+  assert(ssl_client->getCACertificate().length() > 0);
+  // now let's disable one of the controller services.
+  std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
+  assert(cs_id != nullptr);
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->disableControllerService(cs_id);
+    disabled = true;
+    waitToVerifyProcessor();
+  }
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->enableControllerService(cs_id);
+    disabled = false;
+    waitToVerifyProcessor();
+  }
+  std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
+  assert(cs_id->enabled());
+{
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->disableReferencingServices(mock_cont);
+    disabled = true;
+    waitToVerifyProcessor();
+  }
+    assert(cs_id->enabled() == false);
+{
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->enableReferencingServices(mock_cont);
+    disabled = false;
+    waitToVerifyProcessor();
+  }
+  assert(cs_id->enabled() == true);
+
+  controller->waitUnload(60000);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/GetFileNoData.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/GetFileNoData.cpp b/extensions/civet_curl_tests/GetFileNoData.cpp
new file mode 100644
index 0000000..f475f48
--- /dev/null
+++ b/extensions/civet_curl_tests/GetFileNoData.cpp
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "c2/protocols/RESTSender.h"
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+static std::vector<std::string> responses;
+
+class ConfigHandler : public CivetHandler {
+ public:
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    if (responses.size() > 0) {
+      std::string top_str = responses.back();
+      responses.pop_back();
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                top_str.length());
+      mg_printf(conn, "%s", top_str.c_str());
+    } else {
+      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+    }
+
+    return true;
+  }
+
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::ifstream myfile(test_file_location_.c_str());
+
+    if (myfile.is_open()) {
+      std::stringstream buffer;
+      buffer << myfile.rdbuf();
+      std::string str = buffer.str();
+      myfile.close();
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                str.length());
+      mg_printf(conn, "%s", str.c_str());
+    } else {
+      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+    }
+
+    return true;
+  }
+  std::string test_file_location_;
+};
+
+int main(int argc, char **argv) {
+  mg_init_library(0);
+  LogTestController::getInstance().setInfo<minifi::FlowController>();
+  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+  LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+
+  const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+
+  CivetServer server(cpp_options);
+  ConfigHandler h_ex;
+  server.addHandler("/update", h_ex);
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    h_ex.test_file_location_ = test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+  std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+      "\"operation\" : \"update\", "
+      "\"operationid\" : \"8675309\", "
+      "\"name\": \"configuration\""
+      "}]}";
+
+  responses.push_back(heartbeat_response);
+
+  std::ifstream myfile(test_file_location.c_str());
+
+  if (myfile.is_open()) {
+    std::stringstream buffer;
+    buffer << myfile.rdbuf();
+    std::string str = buffer.str();
+    myfile.close();
+    std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+        "\"operation\" : \"update\", "
+        "\"operationid\" : \"8675309\", "
+        "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
+    responses.push_back(response);
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+      minifi::Configure>();
+
+  configuration->set("c2.rest.url",
+                     "http://localhost:9090/update");
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
+      TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file,
+                     test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
+      <minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
+      <core::YamlConfiguration
+      >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory,
+                                    configuration,
+                                    test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      <TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller =
+      std::make_shared<minifi::FlowController
+      >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory,
+                                      configuration,
+                                      test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
+                                                                test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
+      >(ptr.get());
+  ptr.release();
+
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  std::string logs = LogTestController::getInstance().log_output.str();
+  assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
+  LogTestController::getInstance().reset();
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp b/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..01d7231
--- /dev/null
+++ b/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "TestServer.h"
+#include "integration/HTTPIntegrationBase.h"
+#include "sitetositehttp/HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public HTTPIntegrationBase {
+ public:
+  explicit SiteToSiteTestHarness(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+    LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>();
+    LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+
+    configuration->set("nifi.c2.enable", "false");
+    configuration->set("nifi.remote.input.http.enabled", "true");
+    configuration->set("nifi.remote.input.socket.port", "8082");
+  }
+
+  virtual void waitToVerifyProcessor() {
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+struct test_profile {
+  test_profile()
+      : flow_url_broken(false),
+        transaction_url_broken(false),
+        empty_transaction_url(false),
+        no_delete(false),
+        invalid_checksum(false) {
+  }
+
+  bool allFalse() const {
+    return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum;
+  }
+  // tests for a broken flow file url
+  bool flow_url_broken;
+  // transaction url will return incorrect information
+  bool transaction_url_broken;
+  // Location will be absent within the
+  bool empty_transaction_url;
+  // delete url is not supported.
+  bool no_delete;
+  // invalid checksum error
+  bool invalid_checksum;
+};
+
+void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) {
+  SiteToSiteTestHarness harness(isSecure);
+
+  SiteToSiteLocationResponder responder(isSecure);
+
+  TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url);
+
+  std::string transaction_id = transaction_response.getTransactionId();
+
+  harness.setKeyDir("");
+
+  std::string controller_loc = url + "/controller";
+
+  harness.setUrl(controller_loc, &responder);
+
+  std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+  std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+
+  std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+  std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+
+  harness.setUrl(transaction_url, &transaction_response);
+
+  std::string peer_url = url + "/site-to-site/peers";
+
+  PeerResponder peer_response(url);
+
+  harness.setUrl(peer_url, &peer_response);
+
+  std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+  FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum);
+  flowResponder.setFlowUrl(flow_url);
+  auto producedFlows = flowResponder.getFlows();
+
+  TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url);
+  std::string transaction_output_id = transaction_response_output.getTransactionId();
+  transaction_response_output.setFeed(producedFlows);
+
+  harness.setUrl(transaction_output_url, &transaction_response_output);
+
+  std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files";
+
+  FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum);
+  flowOutputResponder.setFlowUrl(flow_output_url);
+  flowOutputResponder.setFeed(producedFlows);
+
+  harness.setUrl(flow_url, &flowResponder);
+  harness.setUrl(flow_output_url, &flowOutputResponder);
+
+  if (!profile.no_delete) {
+    std::string delete_url = transaction_url + "/" + transaction_id;
+    DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12);
+    harness.setUrl(delete_url, &deleteResponse);
+
+    std::string delete_output_url = transaction_output_url + "/" + transaction_output_id;
+    DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows);
+    harness.setUrl(delete_output_url, &deleteOutputResponse);
+  }
+
+  harness.run(test_file_location);
+
+  std::stringstream assertStr;
+  if (profile.allFalse()) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.empty_transaction_url) {
+    assert(LogTestController::getInstance().contains("Location is empty") == true);
+  } else if (profile.transaction_url_broken) {
+    assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true);
+  } else if (profile.invalid_checksum) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site transaction " << transaction_id << " CRC not matched";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site delete transaction " << transaction_id;
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.no_delete) {
+    assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true);
+  } else {
+    assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  }
+  LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+  transaction_id = 0;
+  transaction_id_output = 0;
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  {
+    struct test_profile profile;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.flow_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.empty_transaction_url = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.transaction_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.no_delete = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.invalid_checksum = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp b/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp
new file mode 100644
index 0000000..df40497
--- /dev/null
+++ b/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestServer.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_enable(void *ssl_context, void *user_data) {
+  struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+  return 0;
+}
+
+class HttpResponder : public CivetHandler {
+ public:
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    static const std::string site2site_rest_resp = "hi this is a get test";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+};
+
+int main(int argc, char **argv) {
+  init_webserver();
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<utils::HTTPClient>();
+  LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>();
+  LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+  content_repo->initialize(configuration);
+
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+                                                                                                content_repo,
+                                                                                                DEFAULT_ROOT_GROUP_NAME,
+                                                                                                true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+  std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke");
+  assert(proc != nullptr);
+
+  std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+  assert(inv != nullptr);
+  std::string url = "";
+  inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  ptr.release();
+  HttpResponder h_ex;
+  std::string port, scheme, path;
+  CivetServer *server = nullptr;
+
+  parse_http_components(url, port, scheme, path);
+  struct mg_callbacks callback;
+  if (url.find("localhost") != std::string::npos) {
+    if (scheme == "https") {
+      std::string cert = "";
+      cert = key_dir + "nifi-cert.pem";
+      memset(&callback, 0, sizeof(callback));
+      callback.init_ssl = ssl_enable;
+      port +="s";
+      callback.log_message = log_message;
+      server = start_webserver(port, path, &h_ex, &callback, cert, cert);
+    } else {
+      server = start_webserver(port, path, &h_ex);
+    }
+  }
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  if (url.find("localhost") == std::string::npos) {
+    stop_webserver(server);
+    exit(1);
+  }
+  std::string logs = LogTestController::getInstance().log_output.str();
+
+  assert(logs.find("key:filename value:") != std::string::npos);
+  assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos);
+  assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos);
+
+  LogTestController::getInstance().reset();
+  rmdir("./content_repository");
+  stop_webserver(server);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp b/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp
new file mode 100644
index 0000000..853fdc6
--- /dev/null
+++ b/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+#include <sstream>
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "integration/HTTPIntegrationBase.h"
+
+class HttpTestHarness : public HTTPIntegrationBase {
+ public:
+  HttpTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<core::ProcessContext>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+    configuration->set("nifi.flow.engine.threads", "8");
+    configuration->set("nifi.c2.enable", "false");
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("curl performed") == true);
+    assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true);
+    assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false);
+  }
+
+ protected:
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  HttpTestHarness harness;
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/SiteToSiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/SiteToSiteRestTest.cpp b/extensions/civet_curl_tests/SiteToSiteRestTest.cpp
new file mode 100644
index 0000000..f235be1
--- /dev/null
+++ b/extensions/civet_curl_tests/SiteToSiteRestTest.cpp
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "integration/HTTPIntegrationBase.h"
+
+class Responder : public CivetHandler {
+ public:
+  explicit Responder(bool isSecure)
+      : isSecure(isSecure) {
+  }
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "{"
+        "\"revision\": {"
+        "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+        "},"
+        "\"controller\": {"
+        "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+        "\"name\": \"NiFi Flow\","
+        "\"remoteSiteListeningPort\": 10001,"
+        "\"siteToSiteSecure\": ";
+    site2site_rest_resp += (isSecure ? "true" : "false");
+    site2site_rest_resp += "}}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
+class SiteToSiteTestHarness : public HTTPIntegrationBase {
+ public:
+  explicit SiteToSiteTestHarness(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    if (isSecure) {
+      assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true);
+    } else {
+      assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true);
+    }
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  SiteToSiteTestHarness harness(isSecure);
+
+  Responder responder(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/ThreadPoolAdjust.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/ThreadPoolAdjust.cpp b/extensions/civet_curl_tests/ThreadPoolAdjust.cpp
new file mode 100644
index 0000000..2785117
--- /dev/null
+++ b/extensions/civet_curl_tests/ThreadPoolAdjust.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "integration/HTTPIntegrationBase.h"
+#include "processors/InvokeHTTP.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+
+class HttpTestHarness : public IntegrationBase {
+ public:
+  HttpTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<core::ProcessContext>();
+    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>();
+    LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+    configuration->set("nifi.flow.engine.threads", "1");
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+    assert(LogTestController::getInstance().contains("curl performed") == true);
+    assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true);
+    assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false);
+  }
+
+ protected:
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  HttpTestHarness harness;
+
+  harness.setKeyDir(key_dir);
+
+  harness.run(test_file_location);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/include/TestServer.h
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/include/TestServer.h b/extensions/civet_curl_tests/include/TestServer.h
new file mode 100644
index 0000000..06f996c
--- /dev/null
+++ b/extensions/civet_curl_tests/include/TestServer.h
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_TESTSERVER_H_
+#define LIBMINIFI_TEST_TESTSERVER_H_
+#include <regex.h>
+#include <string>
+#include <iostream>
+#include "civetweb.h"
+#include "CivetServer.h"
+
+
+/* Server context handle */
+static std::string resp_str;
+
+void init_webserver() {
+  mg_init_library(0);
+}
+
+
+CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
+  const char *options[] = { "listening_ports", port.c_str(), "error_log_file",
+      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list",
+      "ALL", "ssl_verify_peer", "no", 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  CivetServer *server = new CivetServer(cpp_options);
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+
+}
+
+CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
+  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
+
+  std::vector<std::string> cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+  CivetServer *server = new CivetServer(cpp_options);
+
+  server->addHandler(rooturi, handler);
+
+  return server;
+
+}
+
+bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
+  regex_t regex;
+
+  const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+
+  int ret = regcomp(&regex, regexstr, REG_EXTENDED);
+  if (ret) {
+    return false;
+  }
+
+  size_t potentialGroups = regex.re_nsub + 1;
+  regmatch_t groups[potentialGroups];
+  if (regexec(&regex, url.c_str(), potentialGroups, groups, 0) == 0) {
+    for (int i = 0; i < potentialGroups; i++) {
+      if (groups[i].rm_so == -1)
+        break;
+
+      std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
+      switch (i) {
+        case 1:
+          scheme = str;
+          break;
+        case 3:
+          port = str;
+          break;
+        case 4:
+          path = str;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+  if (path.empty() || scheme.empty() || port.empty())
+    return false;
+
+  regfree(&regex);
+
+  return true;
+
+}
+
+static void stop_webserver(CivetServer *server) {
+  if (server != nullptr)
+    delete server;
+
+  /* Un-initialize the library */
+  mg_exit_library();
+}
+
+#endif