You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/01 20:27:20 UTC

[6/6] nifi-minifi-cpp git commit: MINIFICPP-60: Add initial implementation of Site to Site changes.

MINIFICPP-60: Add initial implementation of Site to Site changes.

MINIFICPP-60: Update interfaces to allow for seamless extension

MINIFICPP-60: Update functions to accomodate changed signatures

MINIFICPP-60: Update per pull request comments. Fix issues with most tests failing in parallel

MINIFICPP-275: Update CompressContent to use signature

MINIFICPP-60: Correct issue with HTTP where we were using the remote port from the JSON response

This closes #158.

Signed-off-by: Bin Qiu <be...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/726dc403
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/726dc403
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/726dc403

Branch: refs/heads/master
Commit: 726dc403d01794030b6de3cc94370a050d520c1a
Parents: 027fc19
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Oct 9 18:42:24 2017 -0400
Committer: Bin Qiu <be...@gmail.com>
Committed: Wed Nov 1 13:24:48 2017 -0700

----------------------------------------------------------------------
 README.md                                       |    7 +-
 cmake/BuildTests.cmake                          |    1 -
 extensions/http-curl/CMakeLists.txt             |   12 +-
 extensions/http-curl/HTTPCurlLoader.cpp         |   29 +
 extensions/http-curl/HTTPCurlLoader.h           |   86 ++
 extensions/http-curl/HttpCurlLoader.cpp         |   30 -
 extensions/http-curl/HttpCurlLoader.h           |   82 --
 extensions/http-curl/client/HTTPCallback.h      |  188 +++
 extensions/http-curl/client/HTTPClient.cpp      |  319 +++++
 extensions/http-curl/client/HTTPClient.h        |   61 +-
 extensions/http-curl/client/HTTPStream.cpp      |  127 ++
 extensions/http-curl/client/HTTPStream.h        |  172 +++
 extensions/http-curl/client/HttpClient.cpp      |  257 ----
 extensions/http-curl/processors/InvokeHTTP.cpp  |    8 +-
 extensions/http-curl/processors/InvokeHTTP.h    |   10 +-
 extensions/http-curl/protocols/RESTProtocol.h   |    3 +-
 extensions/http-curl/protocols/RESTReceiver.h   |    4 +-
 extensions/http-curl/protocols/RESTSender.cpp   |   60 +-
 extensions/http-curl/protocols/RESTSender.h     |   11 +-
 .../http-curl/sitetosite/HTTPProtocol.cpp       |  310 +++++
 extensions/http-curl/sitetosite/HTTPProtocol.h  |  197 +++
 .../http-curl/sitetosite/HTTPTransaction.h      |   70 +
 extensions/http-curl/sitetosite/PeersEntity.h   |  112 ++
 extensions/libarchive/ArchiveLoader.h           |   10 +-
 extensions/libarchive/BinFiles.cpp              |    2 +-
 extensions/libarchive/BinFiles.h                |    2 +-
 extensions/libarchive/CompressContent.cpp       |    5 +-
 extensions/libarchive/CompressContent.h         |    4 +-
 libminifi/CMakeLists.txt                        |    2 +-
 libminifi/include/RemoteProcessorGroupPort.h    |   36 +-
 libminifi/include/SchedulingAgent.h             |    2 +-
 libminifi/include/Site2SiteClientProtocol.h     |  633 ---------
 libminifi/include/Site2SitePeer.h               |  275 ----
 libminifi/include/core/Processor.h              |   12 +-
 .../SiteToSiteProvenanceReportingTask.h         |    9 +-
 libminifi/include/core/yaml/YamlConfiguration.h |    4 +-
 libminifi/include/io/BaseStream.h               |   12 +-
 libminifi/include/io/CRCStream.h                |   32 +-
 libminifi/include/io/DataStream.h               |    1 +
 libminifi/include/io/NonConvertingStream.h      |  200 +++
 libminifi/include/processors/GetTCP.h           |    4 +-
 libminifi/include/properties/Configure.h        |    1 +
 libminifi/include/sitetosite/Peer.h             |  381 ++++++
 .../include/sitetosite/RawSocketProtocol.h      |  211 +++
 libminifi/include/sitetosite/SiteToSite.h       |  416 ++++++
 libminifi/include/sitetosite/SiteToSiteClient.h |  330 +++++
 .../include/sitetosite/SiteToSiteFactory.h      |   89 ++
 libminifi/include/utils/ByteArrayCallback.h     |  143 ++
 libminifi/include/utils/ByteInputCallBack.h     |   85 --
 libminifi/include/utils/HTTPClient.h            |   89 +-
 libminifi/include/utils/StringUtils.h           |   11 +
 libminifi/src/Configure.cpp                     |    1 +
 libminifi/src/RemoteProcessorGroupPort.cpp      |  141 +-
 libminifi/src/SchedulingAgent.cpp               |    6 +-
 libminifi/src/Site2SiteClientProtocol.cpp       | 1261 ------------------
 libminifi/src/Site2SitePeer.cpp                 |   62 -
 libminifi/src/core/Processor.cpp                |    2 +-
 .../SiteToSiteProvenanceReportingTask.cpp       |   39 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp   |    6 +-
 libminifi/src/io/DataStream.cpp                 |    2 +
 libminifi/src/io/FileStream.cpp                 |    4 +-
 libminifi/src/io/NonConvertingStream.cpp        |  190 +++
 libminifi/src/io/Serializable.cpp               |    1 -
 libminifi/src/processors/GetTCP.cpp             |    4 +-
 libminifi/src/sitetosite/Peer.cpp               |   65 +
 libminifi/src/sitetosite/RawSocketProtocol.cpp  |  629 +++++++++
 libminifi/src/sitetosite/SiteToSiteClient.cpp   |  773 +++++++++++
 libminifi/src/utils/ByteArrayCallback.cpp       |  155 +++
 libminifi/test/TestBase.cpp                     |    5 +-
 libminifi/test/TestBase.h                       |   19 +-
 .../test/curl-tests/C2NullConfiguration.cpp     |    2 -
 .../test/curl-tests/C2VerifyServeResults.cpp    |    2 -
 libminifi/test/curl-tests/CMakeLists.txt        |    7 +-
 .../ControllerServiceIntegrationTests.cpp       |    5 -
 .../test/curl-tests/HTTPSiteToSiteTests.cpp     |  262 ++++
 .../curl-tests/sitetositehttp/CivetStream.h     |  138 ++
 .../curl-tests/sitetositehttp/HTTPHandlers.h    |  322 +++++
 libminifi/test/integration/IntegrationBase.h    |   13 +-
 .../integration/ProvenanceReportingTest.cpp     |    2 +-
 .../test/resources/C2VerifyHeartbeatAndStop.yml |   73 +
 .../test/resources/C2VerifyServeResults.yml     |   73 +
 libminifi/test/resources/TestHTTPGetSecure.yml  |    2 +-
 libminifi/test/resources/TestHTTPPost.yml       |    4 +-
 .../resources/TestHTTPPostChunkedEncoding.yml   |    4 +-
 libminifi/test/resources/TestHTTPSiteToSite.yml |   91 ++
 libminifi/test/resources/TestSite2SiteRest.yml  |    4 +-
 .../test/resources/TestSite2SiteRestSecure.yml  |    2 +-
 libminifi/test/resources/ThreadPoolAdjust.yml   |   97 ++
 libminifi/test/unit/GetTCPTests.cpp             |   12 +-
 libminifi/test/unit/LoggerTests.cpp             |    1 -
 libminifi/test/unit/ProcessorTests.cpp          |   15 +-
 libminifi/test/unit/SerializationTests.cpp      |    2 -
 libminifi/test/unit/Site2SiteTests.cpp          |   35 +-
 93 files changed, 6682 insertions(+), 3008 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 413333b..cc833e0 100644
--- a/README.md
+++ b/README.md
@@ -307,7 +307,7 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
                 max concurrent tasks: 1
                 Properties:
 
-### Site2Site Security Configuration
+### SiteToSite Security Configuration
 
     in minifi.properties
 
@@ -327,6 +327,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
     
+### HTTP SiteToSite Configuration
+To enable HTTPSiteToSite you must set the following flag to true
+	
+    nifi.remote.input.http.enabled=true
+    
 ### Command and Control Configuration
 For more more insight into the API used within the C2 agent, please visit:
 https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 30542ca..99e980b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -86,7 +86,6 @@ FOREACH(testfile ${INTEGRATION_TESTS})
 ENDFOREACH()
 message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...")
 
-
 get_property(extensions GLOBAL PROPERTY EXTENSION-TESTS)
 foreach(EXTENSION ${extensions})
 	message("Adding ${EXTENSION} ? ")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt
index d851aff..cd9f3ba 100644
--- a/extensions/http-curl/CMakeLists.txt
+++ b/extensions/http-curl/CMakeLists.txt
@@ -21,11 +21,12 @@ find_package(CURL REQUIRED)
 
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
+set(CMAKE_CXX_VISIBILITY_PRESET default)
 
 include_directories(../../libminifi/include  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include  ../../thirdparty/)
-include_directories(protocols client processors)
+include_directories(protocols client processors sitetosite)
 
-file(GLOB SOURCES  "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp")
+file(GLOB SOURCES  "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp" "sitetosite/*.cpp")
 
 add_library(minifi-http-curl STATIC ${SOURCES})
 set_property(TARGET minifi-http-curl PROPERTY POSITION_INDEPENDENT_CODE ON)
@@ -43,8 +44,8 @@ endif(CURL_FOUND)
 
 # Include UUID
 find_package(UUID REQUIRED)
-target_link_libraries(minifi-http-curl ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB})
-add_dependencies(minifi-http-curl jsoncpp_project)
+#set(LINK_FLAGS ${LINK_FLAGS} "-Wl,-whole-archive")
+# Include OpenSSL
 find_package(OpenSSL REQUIRED)
 include_directories(${OPENSSL_INCLUDE_DIR})
 target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} )
@@ -66,5 +67,4 @@ else ()
 endif ()
 
 SET (HTTP-CURL minifi-http-curl PARENT_SCOPE)
-
-register_extension(minifi-http-curl)
+register_extension(minifi-http-curl)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HTTPCurlLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HTTPCurlLoader.cpp b/extensions/http-curl/HTTPCurlLoader.cpp
new file mode 100644
index 0000000..010888a
--- /dev/null
+++ b/extensions/http-curl/HTTPCurlLoader.cpp
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPCurlLoader.h"
+
+#include "core/FlowConfiguration.h"
+
+bool HttpCurlObjectFactory::added = core::FlowConfiguration::add_static_func("createHttpCurlFactory");
+extern "C" {
+
+void *createHttpCurlFactory(void) {
+  return new HttpCurlObjectFactory();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HTTPCurlLoader.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h
new file mode 100644
index 0000000..797aebf
--- /dev/null
+++ b/extensions/http-curl/HTTPCurlLoader.h
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTPCURLLOADER_H_
+#define EXTENSIONS_HTTPCURLLOADER_H_
+
+#include "protocols/RESTProtocol.h"
+#include "protocols/RESTSender.h"
+#include "protocols/RESTReceiver.h"
+#include "processors/InvokeHTTP.h"
+#include "client/HTTPClient.h"
+#include "core/ClassLoader.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "utils/StringUtils.h"
+
+class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory {
+ public:
+  HttpCurlObjectFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override{
+    return "HttpCurlObjectFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "HttpCurlObjectFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names;
+    class_names.push_back("RESTProtocol");
+    class_names.push_back("HttpProtocol");
+    class_names.push_back("RESTReceiver");
+    class_names.push_back("RESTSender");
+    class_names.push_back("InvokeHTTP");
+    class_names.push_back("HTTPClient");
+    class_names.push_back("HttpSiteToSiteClient");
+    return class_names;
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTSender")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTSender>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "InvokeHTTP")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::InvokeHTTP>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "HTTPClient")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<utils::HTTPClient>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "HttpProtocol") || utils::StringUtils::equalsIgnoreCase(class_name, "HttpSiteToSiteClient")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::sitetosite::HttpSiteToSiteClient>());
+    } else {
+      return nullptr;
+    }
+  }
+
+  static bool added;
+
+};
+
+extern "C" {
+void *createHttpCurlFactory(void);
+}
+#endif /* EXTENSIONS_HTTPCURLLOADER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HttpCurlLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HttpCurlLoader.cpp b/extensions/http-curl/HttpCurlLoader.cpp
deleted file mode 100644
index b1f3eb5..0000000
--- a/extensions/http-curl/HttpCurlLoader.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "HttpCurlLoader.h"
-
-#include "core/FlowConfiguration.h"
-
-bool HttpCurlObjectFactory::added = core::FlowConfiguration::add_static_func("createHttpCurlFactory");
-
-extern "C" {
-
-void *createHttpCurlFactory(void) {
-  return new HttpCurlObjectFactory();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HttpCurlLoader.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HttpCurlLoader.h b/extensions/http-curl/HttpCurlLoader.h
deleted file mode 100644
index cb9a30d..0000000
--- a/extensions/http-curl/HttpCurlLoader.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EXTENSIONS_HTTPCURLLOADER_H
-#define EXTENSIONS_HTTPCURLLOADER_H
-
-#include "protocols/RESTProtocol.h"
-#include "protocols/RESTSender.h"
-#include "protocols/RESTReceiver.h"
-#include "processors/InvokeHTTP.h"
-#include "client/HTTPClient.h"
-#include "core/ClassLoader.h"
-
-class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory {
- public:
-  HttpCurlObjectFactory() {
-
-  }
-
-  /**
-   * Gets the name of the object.
-   * @return class name of processor
-   */
-  virtual std::string getName() {
-    return "HttpCurlObjectFactory";
-  }
-
-  virtual std::string getClassName() {
-    return "HttpCurlObjectFactory";
-  }
-  /**
-   * Gets the class name for the object
-   * @return class name for the processor.
-   */
-  virtual std::vector<std::string> getClassNames() {
-    std::vector<std::string> class_names;
-    class_names.push_back("RESTProtocol");
-    class_names.push_back("RESTReceiver");
-    class_names.push_back("RESTSender");
-    class_names.push_back("InvokeHTTP");
-    class_names.push_back("HTTPClient");
-    return class_names;
-  }
-
-  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
-    if (class_name == "RESTReceiver") {
-      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>());
-    } else if (class_name == "RESTSender") {
-      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTSender>());
-    } else if (class_name == "InvokeHTTP") {
-      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::InvokeHTTP>());
-    } else if (class_name == "HTTPClient") {
-      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<utils::HTTPClient>());
-    } else {
-      return nullptr;
-    }
-  }
-
-  static bool added;
-
-
-
-};
-
-extern "C" {
-void *createHttpCurlFactory(void);
-}
-#endif /* EXTENSIONS_HTTPCURLLOADER_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPCallback.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
new file mode 100644
index 0000000..aeca2a9
--- /dev/null
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -0,0 +1,188 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include <thread>
+#include <mutex>
+#include <vector>
+#include <condition_variable>
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+  HttpStreamingCallback()
+      : ptr(nullptr),
+        is_alive_(true) {
+    previous_pos_ = 0;
+    rolling_count_ = 0;
+  }
+
+  virtual ~HttpStreamingCallback() {
+
+  }
+
+  void close() {
+    is_alive_ = false;
+    cv.notify_all();
+  }
+
+  virtual void seek(size_t pos) {
+    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
+      load_buffer();
+  }
+
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+
+    std::vector<char> vec;
+
+    if (stream->getSize() > 0) {
+      vec.resize(stream->getSize());
+
+      stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
+    }
+
+    size_t added_size = vec.size();
+
+    byte_arrays_.enqueue(std::move(vec));
+
+    cv.notify_all();
+
+    return added_size;
+
+  }
+
+  virtual int64_t process(uint8_t *vector, size_t size) {
+
+    std::vector<char> vec;
+
+    if (size > 0) {
+      vec.resize(size);
+
+      memcpy(vec.data(), vector, size);
+
+      size_t added_size = vec.size();
+
+      byte_arrays_.enqueue(std::move(vec));
+
+      cv.notify_all();
+
+      return added_size;
+    } else {
+      return 0;
+    }
+
+  }
+
+  virtual void write(std::string content) {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
+    byte_arrays_.enqueue(vec);
+  }
+
+  virtual char *getBuffer(size_t pos) {
+
+    // if there is no space remaining in our current buffer,
+    // we should load the next. If none exists after that we have no more buffer
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
+      load_buffer();
+
+    if (ptr == nullptr)
+      return nullptr;
+
+    size_t absolute_position = pos - previous_pos_;
+
+    current_pos_ = pos;
+    for (int i = 0; i < current_vec_.size(); i++) {
+    }
+
+    return ptr + absolute_position;
+  }
+
+  virtual const size_t getRemaining(size_t pos) {
+    return current_vec_.size();
+  }
+
+  virtual const size_t getBufferSize() {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+    if (ptr == nullptr || current_pos_ >= rolling_count_) {
+      load_buffer();
+    }
+    return rolling_count_;
+  }
+
+ private:
+
+  inline void load_buffer() {
+    std::unique_lock<std::recursive_mutex> lock(mutex_);
+    cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
+    if (!is_alive_ && byte_arrays_.size_approx() == 0) {
+      lock.unlock();
+      return;
+    }
+    try {
+      if (byte_arrays_.try_dequeue(current_vec_)) {
+        ptr = &current_vec_[0];
+        previous_pos_.store(rolling_count_.load());
+        current_pos_ = 0;
+        rolling_count_ += current_vec_.size();
+      } else {
+        ptr = nullptr;
+      }
+      lock.unlock();
+    } catch (...) {
+      lock.unlock();
+    }
+  }
+
+  std::atomic<bool> is_alive_;
+  std::atomic<size_t> rolling_count_;
+  std::condition_variable_any cv;
+  std::atomic<size_t> previous_pos_;
+  std::atomic<size_t> current_pos_;
+
+  std::recursive_mutex mutex_;
+
+  moodycamel::ConcurrentQueue<std::vector<char>> byte_arrays_;
+
+  char *ptr;
+
+  std::vector<char> current_vec_;
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
new file mode 100644
index 0000000..2d5d1e3
--- /dev/null
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -0,0 +1,319 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPClient.h"
+#include <memory>
+#include <climits>
+#include <map>
+#include <vector>
+#include <string>
+#include <algorithm>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
+    : core::Connectable("HTTPClient", 0),
+      ssl_context_service_(ssl_context_service),
+      url_(url),
+      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+      connect_timeout_(0),
+      read_timeout_(0),
+      content_type(nullptr),
+      headers_(nullptr),
+      callback(nullptr),
+      http_code(0),
+      read_callback_(INT_MAX),
+      header_response_(-1),
+      res(CURLE_OK) {
+  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+  http_session_ = curl_easy_init();
+}
+
+HTTPClient::HTTPClient(std::string name, uuid_t uuid)
+    : core::Connectable(name, uuid),
+      ssl_context_service_(nullptr),
+      url_(),
+      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+      connect_timeout_(0),
+      read_timeout_(0),
+      callback(nullptr),
+      content_type(nullptr),
+      read_callback_(INT_MAX),
+      headers_(nullptr),
+      http_code(0),
+      header_response_(-1),
+      res(CURLE_OK) {
+  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+  http_session_ = curl_easy_init();
+}
+
+HTTPClient::HTTPClient()
+    : core::Connectable("HTTPClient", 0),
+      ssl_context_service_(nullptr),
+      callback(nullptr),
+      url_(),
+      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+      connect_timeout_(0),
+      read_timeout_(0),
+      content_type(nullptr),
+      headers_(nullptr),
+      http_code(0),
+      read_callback_(INT_MAX),
+      header_response_(-1),
+      res(CURLE_OK) {
+  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+  http_session_ = curl_easy_init();
+}
+
+HTTPClient::~HTTPClient() {
+  forceClose();
+}
+
+void HTTPClient::forceClose(){
+  if (nullptr != headers_) {
+    curl_slist_free_all(headers_);
+    headers_ = nullptr;
+  }
+  if (http_session_ != nullptr){
+    curl_easy_cleanup(http_session_);
+    http_session_ = nullptr;
+  }
+}
+
+void HTTPClient::setVerbose() {
+  curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L);
+}
+
+void HTTPClient::initialize(const std::string &method, const std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
+  method_ = method;
+  set_request_method(method_);
+  if (ssl_context_service != nullptr) {
+    ssl_context_service_ = ssl_context_service;
+  }
+  if (!url.empty()) {
+    url_ = url;
+  }
+  if (isSecure(url_) && ssl_context_service_ != nullptr) {
+    configure_secure_connection(http_session_);
+  }
+}
+
+void HTTPClient::setDisablePeerVerification() {
+  logger_->log_info("Disabling peer verification");
+  curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
+}
+
+void HTTPClient::setConnectionTimeout(int64_t timeout) {
+  connect_timeout_ = timeout;
+  curl_easy_setopt(http_session_,CURLOPT_NOSIGNAL,1);
+}
+
+void HTTPClient::setReadTimeout(int64_t timeout) {
+  read_timeout_ = timeout;
+}
+
+void HTTPClient::setReadCallback(HTTPReadCallback *callbackObj) {
+  callback = callbackObj;
+  curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(callbackObj));
+}
+
+void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
+  logger_->log_info("Setting callback");
+  if (method_ == "put" || method_ == "PUT") {
+    curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize());
+  }
+  curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
+  curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+}
+
+struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
+  if (http_session_) {
+    for (auto attribute : attributes) {
+      if (matches(attribute.first, regex)) {
+        std::string attr = attribute.first + ":" + attribute.second;
+        headers_ = curl_slist_append(headers_, attr.c_str());
+      }
+    }
+  }
+  return headers_;
+}
+
+void HTTPClient::setContentType(std::string content_type) {
+  content_type_ = "Content-Type: " + content_type;
+  headers_ = curl_slist_append(headers_, content_type_.c_str());
+}
+
+std::string HTTPClient::escape(std::string string_to_escape) {
+  return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length());
+}
+
+void HTTPClient::setPostFields(std::string input) {
+  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length());
+  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str());
+}
+
+void HTTPClient::setHeaders(struct curl_slist *list) {
+  headers_ = list;
+}
+
+void HTTPClient::appendHeader(const std::string &new_header) {
+  headers_ = curl_slist_append(headers_, new_header.c_str());
+}
+
+void HTTPClient::appendHeader(const std::string &key, const std::string &value) {
+  std::stringstream new_header;
+  new_header << key << ": " << value;
+  headers_ = curl_slist_append(headers_, new_header.str().c_str());
+}
+
+void HTTPClient::setUseChunkedEncoding() {
+  headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked");
+}
+
+bool HTTPClient::submit() {
+  if (connect_timeout_ > 0) {
+    curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_);
+  }
+
+  if (headers_ != nullptr) {
+    headers_ = curl_slist_append(headers_, "Expect:");
+    curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
+  }
+
+  curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
+  logger_->log_info("Submitting to %s", url_);
+  if (callback == nullptr) {
+    content_.ptr = &read_callback_;
+    curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
+    curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(&content_));
+  }
+  curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
+  curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast<void*>(&header_response_));
+  curl_easy_setopt(http_session_, CURLOPT_TCP_KEEPALIVE, 0L);
+  res = curl_easy_perform(http_session_);
+  curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
+  curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type);
+  if (res != CURLE_OK) {
+    logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
+    return false;
+  }
+
+  logger_->log_info("Finished with %s", url_);
+  std::string key = "";
+  for (auto header_line : header_response_.header_tokens_) {
+    int i = 0;
+    for (i = 0; i < header_line.length(); i++) {
+      if (header_line.at(i) == ':') {
+        break;
+      }
+    }
+    if (i >= header_line.length()) {
+      if (key.empty())
+        header_response_.append("HEADER", header_line);
+      else
+        header_response_.append(key, header_line);
+    } else {
+      key = header_line.substr(0, i);
+      int length = header_line.length() - (i + 2);
+      if (length <= 0) {
+        continue;
+      }
+      std::string value = header_line.substr(i + 2, length);
+      int end_find = value.size() - 1;
+      for (; end_find > 0; end_find--) {
+        if (value.at(end_find) > 32) {
+          break;
+        }
+      }
+      value = value.substr(0, end_find + 1);
+      header_response_.append(key, value);
+    }
+  }
+  return true;
+}
+
+CURLcode HTTPClient::getResponseResult() {
+  return res;
+}
+
+int64_t &HTTPClient::getResponseCode() {
+  return http_code;
+}
+
+const char *HTTPClient::getContentType() {
+  return content_type;
+}
+
+const std::vector<char> &HTTPClient::getResponseBody() {
+  if (response_body_.size() == 0)
+    response_body_ = std::move(read_callback_.to_string());
+  return response_body_;
+}
+
+void HTTPClient::set_request_method(const std::string method) {
+  std::string my_method = method;
+  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
+  if (my_method == "POST") {
+    curl_easy_setopt(http_session_, CURLOPT_POST, 1L);
+  } else if (my_method == "PUT") {
+    curl_easy_setopt(http_session_, CURLOPT_PUT, 0L);
+  } else if (my_method == "GET") {
+  } else {
+    curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str());
+  }
+}
+
+bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
+  if (sregex == ".*")
+    return true;
+
+  regex_t regex;
+  int ret = regcomp(&regex, sregex.c_str(), 0);
+  if (ret)
+    return false;
+  ret = regexec(&regex, value.c_str(), (size_t) 0, NULL, 0);
+  regfree(&regex);
+  if (ret)
+    return false;
+
+  return true;
+}
+
+void HTTPClient::configure_secure_connection(CURL *http_session) {
+  logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile());
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
+}
+
+bool HTTPClient::isSecure(const std::string &url) {
+  if (url.find("https") != std::string::npos) {
+    logger_->log_debug("%s is a secure url", url);
+    return true;
+  }
+  return false;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPClient.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 609a052..99200d3 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -1,5 +1,4 @@
 /**
- * HTTPUtils class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -28,8 +27,9 @@
 #include <uuid/uuid.h>
 #include <regex.h>
 #include <vector>
+
+#include "utils/ByteArrayCallback.h"
 #include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "properties/Configure.h"
@@ -78,53 +78,63 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   ~HTTPClient();
 
-  void setVerbose();
+  virtual void setVerbose() override ;
+
+  void forceClose();
 
-  void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
+  virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) override;
 
-  void setConnectionTimeout(int64_t timeout);
+  virtual void setConnectionTimeout(int64_t timeout) override;
 
-  void setReadTimeout(int64_t timeout);
+  virtual void setReadTimeout(int64_t timeout) override;
 
-  void setUploadCallback(HTTPUploadCallback *callbackObj);
+  virtual void setUploadCallback(HTTPUploadCallback *callbackObj) override;
+
+  virtual void setReadCallback(HTTPReadCallback *callbackObj) ;
 
   struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
 
-  void setContentType(std::string content_type);
+  virtual void setContentType(std::string content_type) override;
 
-  std::string escape(std::string string_to_escape);
+  virtual std::string escape(std::string string_to_escape) override;
 
-  void setPostFields(std::string input);
+  virtual void setPostFields(std::string input) override;
 
   void setHeaders(struct curl_slist *list);
 
-  void appendHeader(const std::string &new_header);
+  virtual void appendHeader(const std::string &new_header) override;
+
+  void appendHeader(const std::string &key, const std::string &value);
 
-  bool submit();
+  bool submit() override;
 
   CURLcode getResponseResult();
 
-  int64_t &getResponseCode();
+  int64_t &getResponseCode() override;
 
-  const char *getContentType();
+  const char *getContentType() override;
 
-  const std::vector<char> &getResponseBody();
+  const std::vector<char> &getResponseBody() override;
 
-  void set_request_method(const std::string method);
+  void set_request_method(const std::string method) override;
 
-  void setUseChunkedEncoding();
+  void setUseChunkedEncoding() override;
 
-  void setDisablePeerVerification();
+  void setDisablePeerVerification() override;
 
-  const std::vector<std::string> &getHeaders() {
+  const std::vector<std::string> &getHeaders() override{
     return header_response_.header_tokens_;
 
   }
 
+  virtual const std::map<std::string, std::string> &getParsedHeaders() override{
+    return header_response_.header_mapping_;
+  }
+
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  virtual bool isRunning() override{
     return true;
   }
 
@@ -135,7 +145,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
   void waitForWork(uint64_t timeoutMs) {
   }
 
-  virtual void yield() {
+  virtual void yield() override{
 
   }
 
@@ -143,13 +153,13 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  virtual bool isWorkAvailable() override{
     return true;
   }
 
  protected:
 
-  inline bool matches(const std::string &value, const std::string &sregex);
+  inline bool matches(const std::string &value, const std::string &sregex) override;
 
   static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) {
     minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
@@ -161,9 +171,12 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   void configure_secure_connection(CURL *http_session);
 
+  HTTPReadCallback *callback;
+
   bool isSecure(const std::string &url);
   struct curl_slist *headers_;
-  utils::HTTPRequestResponse content_;
+  HTTPReadCallback content_;
+  ByteOutputCallback read_callback_;
   utils::HTTPHeaderResponse header_response_;
   CURLcode res;
   int64_t http_code;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPStream.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
new file mode 100644
index 0000000..8a5a4b9
--- /dev/null
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "HTTPStream.h"
+
+#include <fstream>
+#include <vector>
+#include <memory>
+#include <string>
+
+#include "HTTPCallback.h"
+#include "io/validation.h"
+#include "io/NonConvertingStream.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client)
+    : logger_(logging::LoggerFactory<HttpStream>::getLogger()),
+      http_client_(client),
+      written(0),
+      http_read_callback_(16384),
+      started_(false) {
+  // submit early on
+}
+
+void HttpStream::closeStream() {
+  http_callback_.close();
+  http_read_callback_.close();
+}
+
+void HttpStream::seek(uint64_t offset) {
+  // seek is an unnecessary part of this implementatino
+  throw std::exception();
+}
+
+int HttpStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    return -1;
+  }
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int HttpStream::writeData(uint8_t *value, int size) {
+  if (!IsNullOrEmpty(value)) {
+    if (!started_) {
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (!started_) {
+        callback_.ptr = &http_callback_;
+        callback_.pos = 0;
+        http_client_->setUploadCallback(&callback_);
+        http_client_future_ = std::async(submit_client, http_client_);
+        started_ = true;
+      }
+    }
+    http_callback_.process(value,size);
+    return size;
+  } else {
+    return -1;
+  }
+}
+
+template<typename T>
+inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+int HttpStream::readData(uint8_t *buf, int buflen) {
+  if (!IsNullOrEmpty(buf)) {
+    if (!started_) {
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (!started_) {
+        read_callback_.ptr = &http_read_callback_;
+        read_callback_.pos = 0;
+        http_client_->setReadCallback(&read_callback_);
+        http_client_future_ = std::async(submit_read_client, http_client_, &http_read_callback_);
+        started_ = true;
+      }
+    }
+
+    return http_read_callback_.readFully((char*) buf, buflen);
+
+  } else {
+    return -1;
+  }
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPStream.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
new file mode 100644
index 0000000..de7c7d8
--- /dev/null
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <future>
+#include <vector>
+
+#include "HTTPCallback.h"
+#include "io/BaseStream.h"
+#include "HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class HttpStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit HttpStream(std::shared_ptr<utils::HTTPClient> http_client_);
+
+  virtual ~HttpStream() {
+    closeStream();
+    http_client_future_.get();
+  }
+
+  virtual void closeStream() override;
+
+  const std::shared_ptr<utils::HTTPClient> &getClient() {
+    http_client_future_.get();
+    return http_client_;
+  }
+
+  void forceClose(){
+    closeStream();
+    http_client_->forceClose();
+  }
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  virtual void seek(uint64_t offset) override;
+
+  virtual const uint64_t getSize() const override{
+    return written;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen) override;
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen) override;
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size) override;
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+  static bool submit_client(std::shared_ptr<utils::HTTPClient> client) {
+    bool submit_status = client->submit();
+
+    return submit_status;
+  }
+
+  static bool submit_read_client( std::shared_ptr<utils::HTTPClient> client, utils::ByteOutputCallback *callback){
+    bool submit_status = client->submit();
+    callback->close();
+
+    return submit_status;
+  }
+
+  inline bool isFinished() {
+    if (http_client_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready && (http_read_callback_.getSize() == 0 && http_read_callback_.waitingOps())) {
+     // http_read_callback_.close();
+      return true;
+    }
+    else{
+      return false;
+    }
+  }
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+
+  void reset();
+
+  std::vector<uint8_t> array;
+
+  std::shared_ptr<utils::HTTPClient> http_client_;
+  std::future<bool> http_client_future_;
+
+  size_t written;
+
+  std::mutex mutex_;
+  std::atomic<bool> started_;
+
+  utils::HttpStreamingCallback http_callback_;
+
+  utils::HTTPUploadCallback callback_;
+
+  utils::ByteOutputCallback http_read_callback_;
+
+  utils::HTTPReadCallback read_callback_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HttpClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HttpClient.cpp b/extensions/http-curl/client/HttpClient.cpp
deleted file mode 100644
index f2bc0e1..0000000
--- a/extensions/http-curl/client/HttpClient.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "HTTPClient.h"
-#include <memory>
-#include <map>
-#include <vector>
-#include <string>
-#include <algorithm>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-
-HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
-    : core::Connectable("HTTPClient", 0),
-      ssl_context_service_(ssl_context_service),
-      url_(url),
-      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
-      connect_timeout_(0),
-      read_timeout_(0),
-      content_type(nullptr),
-      headers_(nullptr),
-      http_code(0),
-      header_response_(1),
-      res(CURLE_OK) {
-  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
-  http_session_ = curl_easy_init();
-}
-
-HTTPClient::HTTPClient(std::string name, uuid_t uuid)
-    : core::Connectable(name, uuid),
-      ssl_context_service_(nullptr),
-      url_(),
-      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
-      connect_timeout_(0),
-      read_timeout_(0),
-      content_type(nullptr),
-      headers_(nullptr),
-      http_code(0),
-      header_response_(1),
-      res(CURLE_OK) {
-  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
-  http_session_ = curl_easy_init();
-}
-
-HTTPClient::HTTPClient()
-    : core::Connectable("HTTPClient", 0),
-      ssl_context_service_(nullptr),
-      url_(),
-      logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
-      connect_timeout_(0),
-      read_timeout_(0),
-      content_type(nullptr),
-      headers_(nullptr),
-      http_code(0),
-      header_response_(1),
-      res(CURLE_OK) {
-  HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
-  http_session_ = curl_easy_init();
-}
-
-HTTPClient::~HTTPClient() {
-  if (nullptr != headers_) {
-    curl_slist_free_all(headers_);
-  }
-  curl_easy_cleanup(http_session_);
-}
-
-void HTTPClient::setVerbose() {
-  curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L);
-}
-
-void HTTPClient::initialize(const std::string &method, const std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
-  method_ = method;
-  set_request_method(method_);
-  if (ssl_context_service != nullptr) {
-    ssl_context_service_ = ssl_context_service;
-  }
-  if (!url.empty()) {
-    url_ = url;
-  }
-  if (isSecure(url_) && ssl_context_service_ != nullptr) {
-    configure_secure_connection(http_session_);
-  }
-}
-
-void HTTPClient::setDisablePeerVerification() {
-  logger_->log_info("Disabling peer verification");
-  curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
-}
-
-void HTTPClient::setConnectionTimeout(int64_t timeout) {
-  connect_timeout_ = timeout;
-}
-
-void HTTPClient::setReadTimeout(int64_t timeout) {
-  read_timeout_ = timeout;
-}
-
-void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
-  logger_->log_info("Setting callback");
-  if (method_ == "put" || method_ == "PUT") {
-    curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize());
-  } else {
-    curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, (curl_off_t ) callbackObj->ptr->getBufferSize());
-  }
-  curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
-  curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
-}
-
-struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
-  if (http_session_) {
-    for (auto attribute : attributes) {
-      if (matches(attribute.first, regex)) {
-        std::string attr = attribute.first + ":" + attribute.second;
-        headers_ = curl_slist_append(headers_, attr.c_str());
-      }
-    }
-  }
-  return headers_;
-}
-
-void HTTPClient::setContentType(std::string content_type) {
-  content_type_ = "Content-Type: " + content_type;
-  headers_ = curl_slist_append(headers_, content_type_.c_str());
-}
-
-std::string HTTPClient::escape(std::string string_to_escape) {
-  return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length());
-}
-
-void HTTPClient::setPostFields(std::string input) {
-  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length());
-  curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str());
-}
-
-void HTTPClient::setHeaders(struct curl_slist *list) {
-  headers_ = list;
-}
-
-void HTTPClient::appendHeader(const std::string &new_header) {
-  headers_ = curl_slist_append(headers_, new_header.c_str());
-}
-
-void HTTPClient::setUseChunkedEncoding() {
-  headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked");
-}
-
-bool HTTPClient::submit() {
-  if (connect_timeout_ > 0) {
-    curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_);
-  }
-
-  if (headers_ != nullptr) {
-    headers_ = curl_slist_append(headers_, "Expect:");
-    curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
-  }
-
-  curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
-  logger_->log_info("Submitting to %s", url_);
-  curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
-  curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(&content_));
-
-  curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
-  curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast<void*>(&header_response_));
-
-  res = curl_easy_perform(http_session_);
-  curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
-  curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type);
-  if (res != CURLE_OK) {
-    logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
-    return false;
-  }
-  return true;
-}
-
-CURLcode HTTPClient::getResponseResult() {
-  return res;
-}
-
-int64_t &HTTPClient::getResponseCode() {
-  return http_code;
-}
-
-const char *HTTPClient::getContentType() {
-  return content_type;
-}
-
-const std::vector<char> &HTTPClient::getResponseBody() {
-  return content_.data;
-}
-
-void HTTPClient::set_request_method(const std::string method) {
-  std::string my_method = method;
-  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
-  if (my_method == "POST") {
-    curl_easy_setopt(http_session_, CURLOPT_POST, 1L);
-  } else if (my_method == "PUT") {
-    curl_easy_setopt(http_session_, CURLOPT_PUT, 0L);
-  } else if (my_method == "GET") {
-  } else {
-    curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str());
-  }
-}
-
-bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
-  if (sregex == ".*")
-    return true;
-
-  regex_t regex;
-  int ret = regcomp(&regex, sregex.c_str(), 0);
-  if (ret)
-    return false;
-  ret = regexec(&regex, value.c_str(), (size_t) 0, NULL, 0);
-  regfree(&regex);
-  if (ret)
-    return false;
-
-  return true;
-}
-
-void HTTPClient::configure_secure_connection(CURL *http_session) {
-  logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile());
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
-}
-
-bool HTTPClient::isSecure(const std::string &url) {
-  if (url.find("https") != std::string::npos) {
-    logger_->log_debug("%s is a secure url", url);
-    return true;
-  }
-  return false;
-}
-
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 8e16122..71e8cda 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -33,6 +33,7 @@
 #include <utility>
 #include <vector>
 
+#include "utils/ByteArrayCallback.h"
 #include "core/FlowFile.h"
 #include "core/logging/Logger.h"
 #include "core/ProcessContext.h"
@@ -41,7 +42,6 @@
 #include "io/StreamFactory.h"
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
-#include "utils/ByteInputCallBack.h"
 
 namespace org {
 namespace apache {
@@ -140,7 +140,7 @@ void InvokeHTTP::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!context->getProperty(Method.getName(), method_)) {
     logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str());
     return;
@@ -240,7 +240,7 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) {
   return ("POST" == method || "PUT" == method || "PATCH" == method);
 }
 
-void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_info("onTrigger InvokeHTTP with %s to %s", method_, url_);
 
   std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
@@ -352,7 +352,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
   }
 }
 
-void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess,
+void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::ProcessContext> &context, bool isSuccess,
                        int statusCode) {
   // check if we should yield the processor
   if (!isSuccess && request == nullptr) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 2a00cef..1aa3faa 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -23,6 +23,7 @@
 #include <regex>
 
 #include <curl/curl.h>
+#include "utils/ByteArrayCallback.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -30,7 +31,6 @@
 #include "core/Property.h"
 #include "core/Resource.h"
 #include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 #include "../client/HTTPClient.h"
@@ -104,9 +104,9 @@ class InvokeHTTP : public core::Processor {
   static core::Relationship RelNoRetry;
   static core::Relationship RelFailure;
 
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-  virtual void initialize();
-  virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  virtual void initialize() override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   /**
    * Provides a reference to the URL.
    */
@@ -131,7 +131,7 @@ class InvokeHTTP : public core::Processor {
    * @param isSuccess success code or not
    * @param statuscode http response code.
    */
-  void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode);
+  void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int statusCode);
   /**
    * Determine if we should emit a new flowfile based on our activity
    * @param method method type

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h
index ed62d9c..4767e77 100644
--- a/extensions/http-curl/protocols/RESTProtocol.h
+++ b/extensions/http-curl/protocols/RESTProtocol.h
@@ -22,11 +22,12 @@
 #include "json/writer.h"
 #include <string>
 #include <mutex>
+
+#include "utils/ByteArrayCallback.h"
 #include "CivetServer.h"
 #include "c2/C2Protocol.h"
 #include "c2/HeartBeatReporter.h"
 #include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
 #include "utils/HTTPClient.h"
 
 namespace org {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
index f4e4a3c..77cfc90 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -51,8 +51,8 @@ class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
  public:
   RESTReceiver(std::string name, uuid_t uuid = nullptr);
 
-  void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
-  virtual int16_t heartbeat(const C2Payload &heartbeat);
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+  virtual int16_t heartbeat(const C2Payload &heartbeat) override;
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index e67661c..9959814 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -98,42 +98,38 @@ void RESTSender::update(const std::shared_ptr<Configure> &configure) {
 }
 
 const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
-  if (!url.empty()) {
-    utils::HTTPClient client(url, ssl_context_service_);
-    client.setConnectionTimeout(2);
-
-    std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
-    std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
-    if (direction == Direction::TRANSMIT) {
-      input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
-      callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
-      input->write(outputConfig);
-      callback->ptr = input.get();
-      callback->pos = 0;
-      client.set_request_method("POST");
-      client.setUploadCallback(callback.get());
-    } else {
-      // we do not need to set the uplaod callback
-      // since we are not uploading anything on a get
-      client.set_request_method("GET");
-    }
-    client.setContentType("application/json");
-    bool isOkay = client.submit();
-    int64_t respCode = client.getResponseCode();
+  utils::HTTPClient client(url, ssl_context_service_);
+  client.setConnectionTimeout(2);
+
+  std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
+  std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+  if (direction == Direction::TRANSMIT) {
+    input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+    callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+    input->write(outputConfig);
+    callback->ptr = input.get();
+    callback->pos = 0;
+    client.set_request_method("POST");
+    client.setUploadCallback(callback.get());
+  } else {
+    // we do not need to set the uplaod callback
+    // since we are not uploading anything on a get
+    client.set_request_method("GET");
+  }
+  client.setContentType("application/json");
+  bool isOkay = client.submit();
+  int64_t respCode = client.getResponseCode();
 
-    if (isOkay && respCode) {
-      if (payload.isRaw()) {
-        C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
+  if (isOkay && respCode) {
+    if (payload.isRaw()) {
+      C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
 
-        response_payload.setRawData(client.getResponseBody());
-        return std::move(response_payload);
-      }
-      return parseJsonResponse(payload, client.getResponseBody());
-    } else {
-      return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+      response_payload.setRawData(client.getResponseBody());
+      return std::move(response_payload);
     }
+    return parseJsonResponse(payload, client.getResponseBody());
   } else {
-    return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+    return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 09a1a4d..e4c1e5e 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -22,12 +22,13 @@
 #include "json/writer.h"
 #include <string>
 #include <mutex>
+
+#include "utils/ByteArrayCallback.h"
 #include "CivetServer.h"
 #include "c2/C2Protocol.h"
 #include "RESTProtocol.h"
 #include "c2/HeartBeatReporter.h"
 #include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
 #include "../client/HTTPClient.h"
 
 namespace org {
@@ -49,13 +50,13 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   explicit RESTSender(std::string name, uuid_t uuid = nullptr);
 
-  virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async);
+  virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
 
-  virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async);
+  virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
 
-  virtual void update(const std::shared_ptr<Configure> &configure);
+  virtual void update(const std::shared_ptr<Configure> &configure) override;
 
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/sitetosite/HTTPProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
new file mode 100644
index 0000000..940d3e3
--- /dev/null
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -0,0 +1,310 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPProtocol.h"
+
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <random>
+#include <iostream>
+#include <vector>
+
+#include "PeersEntity.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr<utils::IdGenerator> HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+const std::string HttpSiteToSiteClient::parseTransactionId(const std::string &uri) {
+  int i = 0;
+  for (i = uri.length() - 1; i >= 0; i--) {
+    if (uri.at(i) == '/')
+      break;
+  }
+  return uri.substr(i + 1, uri.length() - (i + 1));
+}
+
+std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string &transactionID, TransferDirection direction) {
+  std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions";
+  auto client = create_http_client(uri.str(), "POST");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->setConnectionTimeout(5);
+
+  client->setContentType("application/json");
+  client->appendHeader("Accept: application/json");
+  client->setUseChunkedEncoding();
+  client->setPostFields("");
+  client->submit();
+  peer_->setStream(nullptr);
+  if (client->getResponseCode() == 201) {
+    // parse the headers
+    auto headers = client->getParsedHeaders();
+    auto intent_name = headers["x-location-uri-intent"];
+    if (intent_name == "transaction-url") {
+      auto url = headers["Location"];
+
+      if (IsNullOrEmpty(&url)) {
+        logger_->log_debug("Location is empty");
+      } else {
+
+        org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(peer_.get());
+        auto transaction = std::make_shared<HttpTransaction>(direction, crcstream);
+        transaction->initialize(this, url);
+        auto transactionId = parseTransactionId(url);
+        if (IsNullOrEmpty(transactionId))
+          return nullptr;
+        transaction->setTransactionId(transactionId);
+        std::shared_ptr<minifi::utils::HTTPClient> client;
+        if (transaction->getDirection() == SEND) {
+          client = openConnectionForSending(transaction);
+        } else {
+          client = openConnectionForReceive(transaction);
+          transaction->setDataAvailable(true);
+          // a 201 tells us that data is available. A 200 would mean that nothing is available.
+        }
+
+        client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+        peer_->setStream(std::unique_ptr<io::DataStream>(new io::HttpStream(client)));
+        transactionID = transaction->getUUIDStr();
+        logger_->log_debug("Created transaction id -%s-", transactionID);
+        known_transactions_[transaction->getUUIDStr()] = transaction;
+        return transaction;
+      }
+    } else {
+      logger_->log_debug("Could not create transaction, intent is %s", intent_name);
+    }
+  } else {
+    logger_->log_debug("Could not create transaction, received %d", client->getResponseCode());
+  }
+  return nullptr;
+}
+
+int HttpSiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
+  if (current_code == FINISH_TRANSACTION) {
+
+    if (transaction->getDirection() == SEND) {
+      auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+      stream->closeStream();
+      auto client = stream->getClient();
+      if (client->getResponseCode() == 202) {
+        code = CONFIRM_TRANSACTION;
+        message = std::string(client->getResponseBody().data(), client->getResponseBody().size());
+      } else {
+        code = UNRECOGNIZED_RESPONSE_CODE;
+      }
+      return 1;
+    } else {
+      return 1;
+    }
+  } else if (transaction->getDirection() == RECEIVE) {
+    if (transaction->getState() == TRANSACTION_STARTED || transaction->getState() == DATA_EXCHANGED) {
+
+      if (current_code == CONFIRM_TRANSACTION && transaction->getState() == DATA_EXCHANGED) {
+        auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+        if (!stream->isFinished()) {
+          logger_->log_info("confirm read for %s, but not finished ", transaction->getUUIDStr());
+        }
+
+        closeTransaction(transaction->getUUIDStr());
+        code = CONFIRM_TRANSACTION;
+      } else {
+        auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+        if (stream->isFinished()) {
+          code = FINISH_TRANSACTION;
+          current_code = FINISH_TRANSACTION;
+        } else {
+          code = CONTINUE_TRANSACTION;
+        }
+      }
+    } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
+      closeTransaction(transaction->getUUIDStr());
+      code = CONFIRM_TRANSACTION;
+    } else {
+
+    }
+    return 1;
+  } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
+    closeTransaction(transaction->getUUIDStr());
+    code = TRANSACTION_FINISHED;
+
+    return 1;
+  }
+  return SiteToSiteClient::readResponse(transaction, code, message);
+
+}
+// write respond
+int HttpSiteToSiteClient::writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
+  current_code = code;
+  if (code == CONFIRM_TRANSACTION || code == FINISH_TRANSACTION) {
+    return 1;
+
+  } else if (code == CONTINUE_TRANSACTION) {
+    logger_->log_info("Continuing HTTP transaction");
+    return 1;
+  }
+  return SiteToSiteClient::writeResponse(transaction, code, message);
+}
+
+bool HttpSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
+  std::stringstream uri;
+  uri << getBaseURI() << "site-to-site/peers";
+
+  auto client = create_http_client(uri.str(), "GET");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->submit();
+
+  if (client->getResponseCode() == 200) {
+    if (sitetosite::PeersEntity::parse(logger_, std::string(client->getResponseBody().data(), client->getResponseBody().size()), port_id_, peers)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForSending(const std::shared_ptr<HttpTransaction> &transaction) {
+  std::stringstream uri;
+  uri << transaction->getTransactionUrl() << "/flow-files";
+  std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "POST"));
+  client->setContentType("application/octet-stream");
+  client->appendHeader("Accept", "text/plain");
+  client->setUseChunkedEncoding();
+  return client;
+}
+
+std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForReceive(const std::shared_ptr<HttpTransaction> &transaction) {
+  std::stringstream uri;
+  uri << transaction->getTransactionUrl() << "/flow-files";
+  std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "GET"));
+  return client;
+}
+
+//! Transfer string for the process session
+bool HttpSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
+                                           std::map<std::string, std::string> attributes) {
+  return false;
+}
+
+void HttpSiteToSiteClient::tearDown() {
+
+  if (peer_state_ >= ESTABLISHED) {
+    logger_->log_info("Site2Site Protocol tearDown");
+  }
+  known_transactions_.clear();
+  peer_->Close();
+  peer_state_ = IDLE;
+
+}
+
+void HttpSiteToSiteClient::closeTransaction(const std::string &transactionID) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return;
+  } else {
+    transaction = it->second;
+    if (transaction->closed_) {
+      return;
+    }
+  }
+
+  std::string append_str;
+  logger_->log_info("Site2Site close transaction %s", transaction->getUUIDStr().c_str());
+
+  int code = UNRECOGNIZED_RESPONSE_CODE;
+  if (transaction->getState() == TRANSACTION_CONFIRMED) {
+    code = CONFIRM_TRANSACTION;
+  } else if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION) {
+    if (transaction->_bytes > 0)
+      code = CONFIRM_TRANSACTION;
+    else
+      code = CANCEL_TRANSACTION;
+
+  } else if (transaction->current_transfers_ == 0 && !transaction->isDataAvailable()) {
+    code = CANCEL_TRANSACTION;
+  }
+
+  std::stringstream uri;
+  std::string dir_str = transaction->getDirection() == SEND ? "input-ports" : "output-ports";
+
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions/" << transactionID << "?responseCode=" << code;
+
+  if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION && transaction->_bytes > 0) {
+    uri << "&checksum=" << transaction->getCRC();
+  }
+  auto client = create_http_client(uri.str(), "DELETE");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->setConnectionTimeout(5);
+
+  client->appendHeader("Accept", "application/json");
+
+  client->submit();
+
+  logger_->log_debug("Received %d response code from delete", client->getResponseCode());
+
+  transaction->closed_ = true;
+
+  transaction->current_transfers_--;
+}
+
+void HttpSiteToSiteClient::deleteTransaction(std::string transactionID) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
+
+  std::string append_str;
+  logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
+
+  closeTransaction(transactionID);
+
+  known_transactions_.erase(transactionID);
+
+}
+
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */