You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/01 20:27:20 UTC
[6/6] nifi-minifi-cpp git commit: MINIFICPP-60: Add initial
implementation of Site to Site changes.
MINIFICPP-60: Add initial implementation of Site to Site changes.
MINIFICPP-60: Update interfaces to allow for seamless extension
MINIFICPP-60: Update functions to accomodate changed signatures
MINIFICPP-60: Update per pull request comments. Fix issues with most tests failing in parallel
MINIFICPP-275: Update CompressContent to use signature
MINIFICPP-60: Correct issue with HTTP where we were using the remote port from the JSON response
This closes #158.
Signed-off-by: Bin Qiu <be...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/726dc403
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/726dc403
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/726dc403
Branch: refs/heads/master
Commit: 726dc403d01794030b6de3cc94370a050d520c1a
Parents: 027fc19
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Oct 9 18:42:24 2017 -0400
Committer: Bin Qiu <be...@gmail.com>
Committed: Wed Nov 1 13:24:48 2017 -0700
----------------------------------------------------------------------
README.md | 7 +-
cmake/BuildTests.cmake | 1 -
extensions/http-curl/CMakeLists.txt | 12 +-
extensions/http-curl/HTTPCurlLoader.cpp | 29 +
extensions/http-curl/HTTPCurlLoader.h | 86 ++
extensions/http-curl/HttpCurlLoader.cpp | 30 -
extensions/http-curl/HttpCurlLoader.h | 82 --
extensions/http-curl/client/HTTPCallback.h | 188 +++
extensions/http-curl/client/HTTPClient.cpp | 319 +++++
extensions/http-curl/client/HTTPClient.h | 61 +-
extensions/http-curl/client/HTTPStream.cpp | 127 ++
extensions/http-curl/client/HTTPStream.h | 172 +++
extensions/http-curl/client/HttpClient.cpp | 257 ----
extensions/http-curl/processors/InvokeHTTP.cpp | 8 +-
extensions/http-curl/processors/InvokeHTTP.h | 10 +-
extensions/http-curl/protocols/RESTProtocol.h | 3 +-
extensions/http-curl/protocols/RESTReceiver.h | 4 +-
extensions/http-curl/protocols/RESTSender.cpp | 60 +-
extensions/http-curl/protocols/RESTSender.h | 11 +-
.../http-curl/sitetosite/HTTPProtocol.cpp | 310 +++++
extensions/http-curl/sitetosite/HTTPProtocol.h | 197 +++
.../http-curl/sitetosite/HTTPTransaction.h | 70 +
extensions/http-curl/sitetosite/PeersEntity.h | 112 ++
extensions/libarchive/ArchiveLoader.h | 10 +-
extensions/libarchive/BinFiles.cpp | 2 +-
extensions/libarchive/BinFiles.h | 2 +-
extensions/libarchive/CompressContent.cpp | 5 +-
extensions/libarchive/CompressContent.h | 4 +-
libminifi/CMakeLists.txt | 2 +-
libminifi/include/RemoteProcessorGroupPort.h | 36 +-
libminifi/include/SchedulingAgent.h | 2 +-
libminifi/include/Site2SiteClientProtocol.h | 633 ---------
libminifi/include/Site2SitePeer.h | 275 ----
libminifi/include/core/Processor.h | 12 +-
.../SiteToSiteProvenanceReportingTask.h | 9 +-
libminifi/include/core/yaml/YamlConfiguration.h | 4 +-
libminifi/include/io/BaseStream.h | 12 +-
libminifi/include/io/CRCStream.h | 32 +-
libminifi/include/io/DataStream.h | 1 +
libminifi/include/io/NonConvertingStream.h | 200 +++
libminifi/include/processors/GetTCP.h | 4 +-
libminifi/include/properties/Configure.h | 1 +
libminifi/include/sitetosite/Peer.h | 381 ++++++
.../include/sitetosite/RawSocketProtocol.h | 211 +++
libminifi/include/sitetosite/SiteToSite.h | 416 ++++++
libminifi/include/sitetosite/SiteToSiteClient.h | 330 +++++
.../include/sitetosite/SiteToSiteFactory.h | 89 ++
libminifi/include/utils/ByteArrayCallback.h | 143 ++
libminifi/include/utils/ByteInputCallBack.h | 85 --
libminifi/include/utils/HTTPClient.h | 89 +-
libminifi/include/utils/StringUtils.h | 11 +
libminifi/src/Configure.cpp | 1 +
libminifi/src/RemoteProcessorGroupPort.cpp | 141 +-
libminifi/src/SchedulingAgent.cpp | 6 +-
libminifi/src/Site2SiteClientProtocol.cpp | 1261 ------------------
libminifi/src/Site2SitePeer.cpp | 62 -
libminifi/src/core/Processor.cpp | 2 +-
.../SiteToSiteProvenanceReportingTask.cpp | 39 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 6 +-
libminifi/src/io/DataStream.cpp | 2 +
libminifi/src/io/FileStream.cpp | 4 +-
libminifi/src/io/NonConvertingStream.cpp | 190 +++
libminifi/src/io/Serializable.cpp | 1 -
libminifi/src/processors/GetTCP.cpp | 4 +-
libminifi/src/sitetosite/Peer.cpp | 65 +
libminifi/src/sitetosite/RawSocketProtocol.cpp | 629 +++++++++
libminifi/src/sitetosite/SiteToSiteClient.cpp | 773 +++++++++++
libminifi/src/utils/ByteArrayCallback.cpp | 155 +++
libminifi/test/TestBase.cpp | 5 +-
libminifi/test/TestBase.h | 19 +-
.../test/curl-tests/C2NullConfiguration.cpp | 2 -
.../test/curl-tests/C2VerifyServeResults.cpp | 2 -
libminifi/test/curl-tests/CMakeLists.txt | 7 +-
.../ControllerServiceIntegrationTests.cpp | 5 -
.../test/curl-tests/HTTPSiteToSiteTests.cpp | 262 ++++
.../curl-tests/sitetositehttp/CivetStream.h | 138 ++
.../curl-tests/sitetositehttp/HTTPHandlers.h | 322 +++++
libminifi/test/integration/IntegrationBase.h | 13 +-
.../integration/ProvenanceReportingTest.cpp | 2 +-
.../test/resources/C2VerifyHeartbeatAndStop.yml | 73 +
.../test/resources/C2VerifyServeResults.yml | 73 +
libminifi/test/resources/TestHTTPGetSecure.yml | 2 +-
libminifi/test/resources/TestHTTPPost.yml | 4 +-
.../resources/TestHTTPPostChunkedEncoding.yml | 4 +-
libminifi/test/resources/TestHTTPSiteToSite.yml | 91 ++
libminifi/test/resources/TestSite2SiteRest.yml | 4 +-
.../test/resources/TestSite2SiteRestSecure.yml | 2 +-
libminifi/test/resources/ThreadPoolAdjust.yml | 97 ++
libminifi/test/unit/GetTCPTests.cpp | 12 +-
libminifi/test/unit/LoggerTests.cpp | 1 -
libminifi/test/unit/ProcessorTests.cpp | 15 +-
libminifi/test/unit/SerializationTests.cpp | 2 -
libminifi/test/unit/Site2SiteTests.cpp | 35 +-
93 files changed, 6682 insertions(+), 3008 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 413333b..cc833e0 100644
--- a/README.md
+++ b/README.md
@@ -307,7 +307,7 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
max concurrent tasks: 1
Properties:
-### Site2Site Security Configuration
+### SiteToSite Security Configuration
in minifi.properties
@@ -327,6 +327,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
if you do not want to enable client certificate base authorization
nifi.security.need.ClientAuth=false
+### HTTP SiteToSite Configuration
+To enable HTTPSiteToSite you must set the following flag to true
+
+ nifi.remote.input.http.enabled=true
+
### Command and Control Configuration
For more more insight into the API used within the C2 agent, please visit:
https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 30542ca..99e980b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -86,7 +86,6 @@ FOREACH(testfile ${INTEGRATION_TESTS})
ENDFOREACH()
message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...")
-
get_property(extensions GLOBAL PROPERTY EXTENSION-TESTS)
foreach(EXTENSION ${extensions})
message("Adding ${EXTENSION} ? ")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt
index d851aff..cd9f3ba 100644
--- a/extensions/http-curl/CMakeLists.txt
+++ b/extensions/http-curl/CMakeLists.txt
@@ -21,11 +21,12 @@ find_package(CURL REQUIRED)
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
+set(CMAKE_CXX_VISIBILITY_PRESET default)
include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
-include_directories(protocols client processors)
+include_directories(protocols client processors sitetosite)
-file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp")
+file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp" "sitetosite/*.cpp")
add_library(minifi-http-curl STATIC ${SOURCES})
set_property(TARGET minifi-http-curl PROPERTY POSITION_INDEPENDENT_CODE ON)
@@ -43,8 +44,8 @@ endif(CURL_FOUND)
# Include UUID
find_package(UUID REQUIRED)
-target_link_libraries(minifi-http-curl ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB})
-add_dependencies(minifi-http-curl jsoncpp_project)
+#set(LINK_FLAGS ${LINK_FLAGS} "-Wl,-whole-archive")
+# Include OpenSSL
find_package(OpenSSL REQUIRED)
include_directories(${OPENSSL_INCLUDE_DIR})
target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} )
@@ -66,5 +67,4 @@ else ()
endif ()
SET (HTTP-CURL minifi-http-curl PARENT_SCOPE)
-
-register_extension(minifi-http-curl)
+register_extension(minifi-http-curl)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HTTPCurlLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HTTPCurlLoader.cpp b/extensions/http-curl/HTTPCurlLoader.cpp
new file mode 100644
index 0000000..010888a
--- /dev/null
+++ b/extensions/http-curl/HTTPCurlLoader.cpp
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPCurlLoader.h"
+
+#include "core/FlowConfiguration.h"
+
+bool HttpCurlObjectFactory::added = core::FlowConfiguration::add_static_func("createHttpCurlFactory");
+extern "C" {
+
+void *createHttpCurlFactory(void) {
+ return new HttpCurlObjectFactory();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HTTPCurlLoader.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h
new file mode 100644
index 0000000..797aebf
--- /dev/null
+++ b/extensions/http-curl/HTTPCurlLoader.h
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTPCURLLOADER_H_
+#define EXTENSIONS_HTTPCURLLOADER_H_
+
+#include "protocols/RESTProtocol.h"
+#include "protocols/RESTSender.h"
+#include "protocols/RESTReceiver.h"
+#include "processors/InvokeHTTP.h"
+#include "client/HTTPClient.h"
+#include "core/ClassLoader.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "utils/StringUtils.h"
+
+class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory {
+ public:
+ HttpCurlObjectFactory() {
+
+ }
+
+ /**
+ * Gets the name of the object.
+ * @return class name of processor
+ */
+ virtual std::string getName() override{
+ return "HttpCurlObjectFactory";
+ }
+
+ virtual std::string getClassName() override{
+ return "HttpCurlObjectFactory";
+ }
+ /**
+ * Gets the class name for the object
+ * @return class name for the processor.
+ */
+ virtual std::vector<std::string> getClassNames() override{
+ std::vector<std::string> class_names;
+ class_names.push_back("RESTProtocol");
+ class_names.push_back("HttpProtocol");
+ class_names.push_back("RESTReceiver");
+ class_names.push_back("RESTSender");
+ class_names.push_back("InvokeHTTP");
+ class_names.push_back("HTTPClient");
+ class_names.push_back("HttpSiteToSiteClient");
+ return class_names;
+ }
+
+ virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTSender")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTSender>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "InvokeHTTP")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::InvokeHTTP>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "HTTPClient")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<utils::HTTPClient>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "HttpProtocol") || utils::StringUtils::equalsIgnoreCase(class_name, "HttpSiteToSiteClient")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::sitetosite::HttpSiteToSiteClient>());
+ } else {
+ return nullptr;
+ }
+ }
+
+ static bool added;
+
+};
+
+extern "C" {
+void *createHttpCurlFactory(void);
+}
+#endif /* EXTENSIONS_HTTPCURLLOADER_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HttpCurlLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HttpCurlLoader.cpp b/extensions/http-curl/HttpCurlLoader.cpp
deleted file mode 100644
index b1f3eb5..0000000
--- a/extensions/http-curl/HttpCurlLoader.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "HttpCurlLoader.h"
-
-#include "core/FlowConfiguration.h"
-
-bool HttpCurlObjectFactory::added = core::FlowConfiguration::add_static_func("createHttpCurlFactory");
-
-extern "C" {
-
-void *createHttpCurlFactory(void) {
- return new HttpCurlObjectFactory();
-}
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/HttpCurlLoader.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HttpCurlLoader.h b/extensions/http-curl/HttpCurlLoader.h
deleted file mode 100644
index cb9a30d..0000000
--- a/extensions/http-curl/HttpCurlLoader.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EXTENSIONS_HTTPCURLLOADER_H
-#define EXTENSIONS_HTTPCURLLOADER_H
-
-#include "protocols/RESTProtocol.h"
-#include "protocols/RESTSender.h"
-#include "protocols/RESTReceiver.h"
-#include "processors/InvokeHTTP.h"
-#include "client/HTTPClient.h"
-#include "core/ClassLoader.h"
-
-class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core::ObjectFactory {
- public:
- HttpCurlObjectFactory() {
-
- }
-
- /**
- * Gets the name of the object.
- * @return class name of processor
- */
- virtual std::string getName() {
- return "HttpCurlObjectFactory";
- }
-
- virtual std::string getClassName() {
- return "HttpCurlObjectFactory";
- }
- /**
- * Gets the class name for the object
- * @return class name for the processor.
- */
- virtual std::vector<std::string> getClassNames() {
- std::vector<std::string> class_names;
- class_names.push_back("RESTProtocol");
- class_names.push_back("RESTReceiver");
- class_names.push_back("RESTSender");
- class_names.push_back("InvokeHTTP");
- class_names.push_back("HTTPClient");
- return class_names;
- }
-
- virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
- if (class_name == "RESTReceiver") {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>());
- } else if (class_name == "RESTSender") {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTSender>());
- } else if (class_name == "InvokeHTTP") {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::InvokeHTTP>());
- } else if (class_name == "HTTPClient") {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<utils::HTTPClient>());
- } else {
- return nullptr;
- }
- }
-
- static bool added;
-
-
-
-};
-
-extern "C" {
-void *createHttpCurlFactory(void);
-}
-#endif /* EXTENSIONS_HTTPCURLLOADER_H */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPCallback.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
new file mode 100644
index 0000000..aeca2a9
--- /dev/null
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -0,0 +1,188 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include <thread>
+#include <mutex>
+#include <vector>
+#include <condition_variable>
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+ HttpStreamingCallback()
+ : ptr(nullptr),
+ is_alive_(true) {
+ previous_pos_ = 0;
+ rolling_count_ = 0;
+ }
+
+ virtual ~HttpStreamingCallback() {
+
+ }
+
+ void close() {
+ is_alive_ = false;
+ cv.notify_all();
+ }
+
+ virtual void seek(size_t pos) {
+ if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
+ load_buffer();
+ }
+
+ virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+
+ std::vector<char> vec;
+
+ if (stream->getSize() > 0) {
+ vec.resize(stream->getSize());
+
+ stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
+ }
+
+ size_t added_size = vec.size();
+
+ byte_arrays_.enqueue(std::move(vec));
+
+ cv.notify_all();
+
+ return added_size;
+
+ }
+
+ virtual int64_t process(uint8_t *vector, size_t size) {
+
+ std::vector<char> vec;
+
+ if (size > 0) {
+ vec.resize(size);
+
+ memcpy(vec.data(), vector, size);
+
+ size_t added_size = vec.size();
+
+ byte_arrays_.enqueue(std::move(vec));
+
+ cv.notify_all();
+
+ return added_size;
+ } else {
+ return 0;
+ }
+
+ }
+
+ virtual void write(std::string content) {
+ std::vector<char> vec;
+ vec.assign(content.begin(), content.end());
+ byte_arrays_.enqueue(vec);
+ }
+
+ virtual char *getBuffer(size_t pos) {
+
+ // if there is no space remaining in our current buffer,
+ // we should load the next. If none exists after that we have no more buffer
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+ if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
+ load_buffer();
+
+ if (ptr == nullptr)
+ return nullptr;
+
+ size_t absolute_position = pos - previous_pos_;
+
+ current_pos_ = pos;
+ for (int i = 0; i < current_vec_.size(); i++) {
+ }
+
+ return ptr + absolute_position;
+ }
+
+ virtual const size_t getRemaining(size_t pos) {
+ return current_vec_.size();
+ }
+
+ virtual const size_t getBufferSize() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+ if (ptr == nullptr || current_pos_ >= rolling_count_) {
+ load_buffer();
+ }
+ return rolling_count_;
+ }
+
+ private:
+
+ inline void load_buffer() {
+ std::unique_lock<std::recursive_mutex> lock(mutex_);
+ cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
+ if (!is_alive_ && byte_arrays_.size_approx() == 0) {
+ lock.unlock();
+ return;
+ }
+ try {
+ if (byte_arrays_.try_dequeue(current_vec_)) {
+ ptr = ¤t_vec_[0];
+ previous_pos_.store(rolling_count_.load());
+ current_pos_ = 0;
+ rolling_count_ += current_vec_.size();
+ } else {
+ ptr = nullptr;
+ }
+ lock.unlock();
+ } catch (...) {
+ lock.unlock();
+ }
+ }
+
+ std::atomic<bool> is_alive_;
+ std::atomic<size_t> rolling_count_;
+ std::condition_variable_any cv;
+ std::atomic<size_t> previous_pos_;
+ std::atomic<size_t> current_pos_;
+
+ std::recursive_mutex mutex_;
+
+ moodycamel::ConcurrentQueue<std::vector<char>> byte_arrays_;
+
+ char *ptr;
+
+ std::vector<char> current_vec_;
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
new file mode 100644
index 0000000..2d5d1e3
--- /dev/null
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -0,0 +1,319 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPClient.h"
+#include <memory>
+#include <climits>
+#include <map>
+#include <vector>
+#include <string>
+#include <algorithm>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
+ : core::Connectable("HTTPClient", 0),
+ ssl_context_service_(ssl_context_service),
+ url_(url),
+ logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+ connect_timeout_(0),
+ read_timeout_(0),
+ content_type(nullptr),
+ headers_(nullptr),
+ callback(nullptr),
+ http_code(0),
+ read_callback_(INT_MAX),
+ header_response_(-1),
+ res(CURLE_OK) {
+ HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+ http_session_ = curl_easy_init();
+}
+
+HTTPClient::HTTPClient(std::string name, uuid_t uuid)
+ : core::Connectable(name, uuid),
+ ssl_context_service_(nullptr),
+ url_(),
+ logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+ connect_timeout_(0),
+ read_timeout_(0),
+ callback(nullptr),
+ content_type(nullptr),
+ read_callback_(INT_MAX),
+ headers_(nullptr),
+ http_code(0),
+ header_response_(-1),
+ res(CURLE_OK) {
+ HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+ http_session_ = curl_easy_init();
+}
+
+HTTPClient::HTTPClient()
+ : core::Connectable("HTTPClient", 0),
+ ssl_context_service_(nullptr),
+ callback(nullptr),
+ url_(),
+ logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
+ connect_timeout_(0),
+ read_timeout_(0),
+ content_type(nullptr),
+ headers_(nullptr),
+ http_code(0),
+ read_callback_(INT_MAX),
+ header_response_(-1),
+ res(CURLE_OK) {
+ HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
+ http_session_ = curl_easy_init();
+}
+
+HTTPClient::~HTTPClient() {
+ forceClose();
+}
+
+void HTTPClient::forceClose(){
+ if (nullptr != headers_) {
+ curl_slist_free_all(headers_);
+ headers_ = nullptr;
+ }
+ if (http_session_ != nullptr){
+ curl_easy_cleanup(http_session_);
+ http_session_ = nullptr;
+ }
+}
+
+void HTTPClient::setVerbose() {
+ curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L);
+}
+
+void HTTPClient::initialize(const std::string &method, const std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
+ method_ = method;
+ set_request_method(method_);
+ if (ssl_context_service != nullptr) {
+ ssl_context_service_ = ssl_context_service;
+ }
+ if (!url.empty()) {
+ url_ = url;
+ }
+ if (isSecure(url_) && ssl_context_service_ != nullptr) {
+ configure_secure_connection(http_session_);
+ }
+}
+
+void HTTPClient::setDisablePeerVerification() {
+ logger_->log_info("Disabling peer verification");
+ curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
+}
+
+void HTTPClient::setConnectionTimeout(int64_t timeout) {
+ connect_timeout_ = timeout;
+ curl_easy_setopt(http_session_,CURLOPT_NOSIGNAL,1);
+}
+
+void HTTPClient::setReadTimeout(int64_t timeout) {
+ read_timeout_ = timeout;
+}
+
+void HTTPClient::setReadCallback(HTTPReadCallback *callbackObj) {
+ callback = callbackObj;
+ curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
+ curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(callbackObj));
+}
+
+void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
+ logger_->log_info("Setting callback");
+ if (method_ == "put" || method_ == "PUT") {
+ curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize());
+ }
+ curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
+ curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
+}
+
+struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
+ if (http_session_) {
+ for (auto attribute : attributes) {
+ if (matches(attribute.first, regex)) {
+ std::string attr = attribute.first + ":" + attribute.second;
+ headers_ = curl_slist_append(headers_, attr.c_str());
+ }
+ }
+ }
+ return headers_;
+}
+
+void HTTPClient::setContentType(std::string content_type) {
+ content_type_ = "Content-Type: " + content_type;
+ headers_ = curl_slist_append(headers_, content_type_.c_str());
+}
+
+std::string HTTPClient::escape(std::string string_to_escape) {
+ return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length());
+}
+
+void HTTPClient::setPostFields(std::string input) {
+ curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length());
+ curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str());
+}
+
+void HTTPClient::setHeaders(struct curl_slist *list) {
+ headers_ = list;
+}
+
+void HTTPClient::appendHeader(const std::string &new_header) {
+ headers_ = curl_slist_append(headers_, new_header.c_str());
+}
+
+void HTTPClient::appendHeader(const std::string &key, const std::string &value) {
+ std::stringstream new_header;
+ new_header << key << ": " << value;
+ headers_ = curl_slist_append(headers_, new_header.str().c_str());
+}
+
+void HTTPClient::setUseChunkedEncoding() {
+ headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked");
+}
+
+bool HTTPClient::submit() {
+ if (connect_timeout_ > 0) {
+ curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_);
+ }
+
+ if (headers_ != nullptr) {
+ headers_ = curl_slist_append(headers_, "Expect:");
+ curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
+ }
+
+ curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
+ logger_->log_info("Submitting to %s", url_);
+ if (callback == nullptr) {
+ content_.ptr = &read_callback_;
+ curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
+ curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(&content_));
+ }
+ curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
+ curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast<void*>(&header_response_));
+ curl_easy_setopt(http_session_, CURLOPT_TCP_KEEPALIVE, 0L);
+ res = curl_easy_perform(http_session_);
+ curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
+ curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type);
+ if (res != CURLE_OK) {
+ logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
+ return false;
+ }
+
+ logger_->log_info("Finished with %s", url_);
+ std::string key = "";
+ for (auto header_line : header_response_.header_tokens_) {
+ int i = 0;
+ for (i = 0; i < header_line.length(); i++) {
+ if (header_line.at(i) == ':') {
+ break;
+ }
+ }
+ if (i >= header_line.length()) {
+ if (key.empty())
+ header_response_.append("HEADER", header_line);
+ else
+ header_response_.append(key, header_line);
+ } else {
+ key = header_line.substr(0, i);
+ int length = header_line.length() - (i + 2);
+ if (length <= 0) {
+ continue;
+ }
+ std::string value = header_line.substr(i + 2, length);
+ int end_find = value.size() - 1;
+ for (; end_find > 0; end_find--) {
+ if (value.at(end_find) > 32) {
+ break;
+ }
+ }
+ value = value.substr(0, end_find + 1);
+ header_response_.append(key, value);
+ }
+ }
+ return true;
+}
+
+CURLcode HTTPClient::getResponseResult() {
+ return res;
+}
+
+int64_t &HTTPClient::getResponseCode() {
+ return http_code;
+}
+
+const char *HTTPClient::getContentType() {
+ return content_type;
+}
+
+const std::vector<char> &HTTPClient::getResponseBody() {
+ if (response_body_.size() == 0)
+ response_body_ = std::move(read_callback_.to_string());
+ return response_body_;
+}
+
+void HTTPClient::set_request_method(const std::string method) {
+ std::string my_method = method;
+ std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
+ if (my_method == "POST") {
+ curl_easy_setopt(http_session_, CURLOPT_POST, 1L);
+ } else if (my_method == "PUT") {
+ curl_easy_setopt(http_session_, CURLOPT_PUT, 0L);
+ } else if (my_method == "GET") {
+ } else {
+ curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str());
+ }
+}
+
+bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
+ if (sregex == ".*")
+ return true;
+
+ regex_t regex;
+ int ret = regcomp(®ex, sregex.c_str(), 0);
+ if (ret)
+ return false;
+ ret = regexec(®ex, value.c_str(), (size_t) 0, NULL, 0);
+ regfree(®ex);
+ if (ret)
+ return false;
+
+ return true;
+}
+
+void HTTPClient::configure_secure_connection(CURL *http_session) {
+ logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile());
+ curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
+ curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
+}
+
+bool HTTPClient::isSecure(const std::string &url) {
+ if (url.find("https") != std::string::npos) {
+ logger_->log_debug("%s is a secure url", url);
+ return true;
+ }
+ return false;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPClient.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 609a052..99200d3 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -1,5 +1,4 @@
/**
- * HTTPUtils class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -28,8 +27,9 @@
#include <uuid/uuid.h>
#include <regex.h>
#include <vector>
+
+#include "utils/ByteArrayCallback.h"
#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "properties/Configure.h"
@@ -78,53 +78,63 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
~HTTPClient();
- void setVerbose();
+ virtual void setVerbose() override ;
+
+ void forceClose();
- void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
+ virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) override;
- void setConnectionTimeout(int64_t timeout);
+ virtual void setConnectionTimeout(int64_t timeout) override;
- void setReadTimeout(int64_t timeout);
+ virtual void setReadTimeout(int64_t timeout) override;
- void setUploadCallback(HTTPUploadCallback *callbackObj);
+ virtual void setUploadCallback(HTTPUploadCallback *callbackObj) override;
+
+ virtual void setReadCallback(HTTPReadCallback *callbackObj) ;
struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
- void setContentType(std::string content_type);
+ virtual void setContentType(std::string content_type) override;
- std::string escape(std::string string_to_escape);
+ virtual std::string escape(std::string string_to_escape) override;
- void setPostFields(std::string input);
+ virtual void setPostFields(std::string input) override;
void setHeaders(struct curl_slist *list);
- void appendHeader(const std::string &new_header);
+ virtual void appendHeader(const std::string &new_header) override;
+
+ void appendHeader(const std::string &key, const std::string &value);
- bool submit();
+ bool submit() override;
CURLcode getResponseResult();
- int64_t &getResponseCode();
+ int64_t &getResponseCode() override;
- const char *getContentType();
+ const char *getContentType() override;
- const std::vector<char> &getResponseBody();
+ const std::vector<char> &getResponseBody() override;
- void set_request_method(const std::string method);
+ void set_request_method(const std::string method) override;
- void setUseChunkedEncoding();
+ void setUseChunkedEncoding() override;
- void setDisablePeerVerification();
+ void setDisablePeerVerification() override;
- const std::vector<std::string> &getHeaders() {
+ const std::vector<std::string> &getHeaders() override{
return header_response_.header_tokens_;
}
+ virtual const std::map<std::string, std::string> &getParsedHeaders() override{
+ return header_response_.header_mapping_;
+ }
+
/**
* Determines if we are connected and operating
*/
- virtual bool isRunning() {
+ virtual bool isRunning() override{
return true;
}
@@ -135,7 +145,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
void waitForWork(uint64_t timeoutMs) {
}
- virtual void yield() {
+ virtual void yield() override{
}
@@ -143,13 +153,13 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
- virtual bool isWorkAvailable() {
+ virtual bool isWorkAvailable() override{
return true;
}
protected:
- inline bool matches(const std::string &value, const std::string &sregex);
+ inline bool matches(const std::string &value, const std::string &sregex) override;
static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param) {
minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
@@ -161,9 +171,12 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
void configure_secure_connection(CURL *http_session);
+ HTTPReadCallback *callback;
+
bool isSecure(const std::string &url);
struct curl_slist *headers_;
- utils::HTTPRequestResponse content_;
+ HTTPReadCallback content_;
+ ByteOutputCallback read_callback_;
utils::HTTPHeaderResponse header_response_;
CURLcode res;
int64_t http_code;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPStream.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
new file mode 100644
index 0000000..8a5a4b9
--- /dev/null
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "HTTPStream.h"
+
+#include <fstream>
+#include <vector>
+#include <memory>
+#include <string>
+
+#include "HTTPCallback.h"
+#include "io/validation.h"
+#include "io/NonConvertingStream.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client)
+ : logger_(logging::LoggerFactory<HttpStream>::getLogger()),
+ http_client_(client),
+ written(0),
+ http_read_callback_(16384),
+ started_(false) {
+ // submit early on
+}
+
+void HttpStream::closeStream() {
+ http_callback_.close();
+ http_read_callback_.close();
+}
+
+void HttpStream::seek(uint64_t offset) {
+ // seek is an unnecessary part of this implementatino
+ throw std::exception();
+}
+
+int HttpStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ return -1;
+ }
+ return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int HttpStream::writeData(uint8_t *value, int size) {
+ if (!IsNullOrEmpty(value)) {
+ if (!started_) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!started_) {
+ callback_.ptr = &http_callback_;
+ callback_.pos = 0;
+ http_client_->setUploadCallback(&callback_);
+ http_client_future_ = std::async(submit_client, http_client_);
+ started_ = true;
+ }
+ }
+ http_callback_.process(value,size);
+ return size;
+ } else {
+ return -1;
+ }
+}
+
+template<typename T>
+inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) {
+ std::vector<uint8_t> buf;
+ buf.resize(sizeof t);
+ readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ return buf;
+}
+
+int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ buf.resize(buflen);
+ }
+ int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+ if (ret < buflen) {
+ buf.resize(ret);
+ }
+ return ret;
+}
+
+int HttpStream::readData(uint8_t *buf, int buflen) {
+ if (!IsNullOrEmpty(buf)) {
+ if (!started_) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!started_) {
+ read_callback_.ptr = &http_read_callback_;
+ read_callback_.pos = 0;
+ http_client_->setReadCallback(&read_callback_);
+ http_client_future_ = std::async(submit_read_client, http_client_, &http_read_callback_);
+ started_ = true;
+ }
+ }
+
+ return http_read_callback_.readFully((char*) buf, buflen);
+
+ } else {
+ return -1;
+ }
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HTTPStream.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
new file mode 100644
index 0000000..de7c7d8
--- /dev/null
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <future>
+#include <vector>
+
+#include "HTTPCallback.h"
+#include "io/BaseStream.h"
+#include "HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class HttpStream : public io::BaseStream {
+ public:
+ /**
+ * File Stream constructor that accepts an fstream shared pointer.
+ * It must already be initialized for read and write.
+ */
+ explicit HttpStream(std::shared_ptr<utils::HTTPClient> http_client_);
+
+ virtual ~HttpStream() {
+ closeStream();
+ http_client_future_.get();
+ }
+
+ virtual void closeStream() override;
+
+ const std::shared_ptr<utils::HTTPClient> &getClient() {
+ http_client_future_.get();
+ return http_client_;
+ }
+
+ void forceClose(){
+ closeStream();
+ http_client_->forceClose();
+ }
+ /**
+ * Skip to the specified offset.
+ * @param offset offset to which we will skip
+ */
+ virtual void seek(uint64_t offset) override;
+
+ virtual const uint64_t getSize() const override{
+ return written;
+ }
+
+ // data stream extensions
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen) override;
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen) override;
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+ /**
+ * writes value to stream
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size) override;
+
+ /**
+ * Returns the underlying buffer
+ * @return vector's array
+ **/
+ const uint8_t *getBuffer() const {
+ throw std::runtime_error("Stream does not support this operation");
+ }
+
+ static bool submit_client(std::shared_ptr<utils::HTTPClient> client) {
+ bool submit_status = client->submit();
+
+ return submit_status;
+ }
+
+ static bool submit_read_client( std::shared_ptr<utils::HTTPClient> client, utils::ByteOutputCallback *callback){
+ bool submit_status = client->submit();
+ callback->close();
+
+ return submit_status;
+ }
+
+ inline bool isFinished() {
+ if (http_client_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready && (http_read_callback_.getSize() == 0 && http_read_callback_.waitingOps())) {
+ // http_read_callback_.close();
+ return true;
+ }
+ else{
+ return false;
+ }
+ }
+
+ protected:
+
+ /**
+ * Creates a vector and returns the vector using the provided
+ * type name.
+ * @param t incoming object
+ * @returns vector.
+ */
+ template<typename T>
+ std::vector<uint8_t> readBuffer(const T&);
+
+ void reset();
+
+ std::vector<uint8_t> array;
+
+ std::shared_ptr<utils::HTTPClient> http_client_;
+ std::future<bool> http_client_future_;
+
+ size_t written;
+
+ std::mutex mutex_;
+ std::atomic<bool> started_;
+
+ utils::HttpStreamingCallback http_callback_;
+
+ utils::HTTPUploadCallback callback_;
+
+ utils::ByteOutputCallback http_read_callback_;
+
+ utils::HTTPReadCallback read_callback_;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_HTTPSTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/client/HttpClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HttpClient.cpp b/extensions/http-curl/client/HttpClient.cpp
deleted file mode 100644
index f2bc0e1..0000000
--- a/extensions/http-curl/client/HttpClient.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "HTTPClient.h"
-#include <memory>
-#include <map>
-#include <vector>
-#include <string>
-#include <algorithm>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-
-HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
- : core::Connectable("HTTPClient", 0),
- ssl_context_service_(ssl_context_service),
- url_(url),
- logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
- connect_timeout_(0),
- read_timeout_(0),
- content_type(nullptr),
- headers_(nullptr),
- http_code(0),
- header_response_(1),
- res(CURLE_OK) {
- HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
- http_session_ = curl_easy_init();
-}
-
-HTTPClient::HTTPClient(std::string name, uuid_t uuid)
- : core::Connectable(name, uuid),
- ssl_context_service_(nullptr),
- url_(),
- logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
- connect_timeout_(0),
- read_timeout_(0),
- content_type(nullptr),
- headers_(nullptr),
- http_code(0),
- header_response_(1),
- res(CURLE_OK) {
- HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
- http_session_ = curl_easy_init();
-}
-
-HTTPClient::HTTPClient()
- : core::Connectable("HTTPClient", 0),
- ssl_context_service_(nullptr),
- url_(),
- logger_(logging::LoggerFactory<HTTPClient>::getLogger()),
- connect_timeout_(0),
- read_timeout_(0),
- content_type(nullptr),
- headers_(nullptr),
- http_code(0),
- header_response_(1),
- res(CURLE_OK) {
- HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance();
- http_session_ = curl_easy_init();
-}
-
-HTTPClient::~HTTPClient() {
- if (nullptr != headers_) {
- curl_slist_free_all(headers_);
- }
- curl_easy_cleanup(http_session_);
-}
-
-void HTTPClient::setVerbose() {
- curl_easy_setopt(http_session_, CURLOPT_VERBOSE, 1L);
-}
-
-void HTTPClient::initialize(const std::string &method, const std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
- method_ = method;
- set_request_method(method_);
- if (ssl_context_service != nullptr) {
- ssl_context_service_ = ssl_context_service;
- }
- if (!url.empty()) {
- url_ = url;
- }
- if (isSecure(url_) && ssl_context_service_ != nullptr) {
- configure_secure_connection(http_session_);
- }
-}
-
-void HTTPClient::setDisablePeerVerification() {
- logger_->log_info("Disabling peer verification");
- curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
-}
-
-void HTTPClient::setConnectionTimeout(int64_t timeout) {
- connect_timeout_ = timeout;
-}
-
-void HTTPClient::setReadTimeout(int64_t timeout) {
- read_timeout_ = timeout;
-}
-
-void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
- logger_->log_info("Setting callback");
- if (method_ == "put" || method_ == "PUT") {
- curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize());
- } else {
- curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, (curl_off_t ) callbackObj->ptr->getBufferSize());
- }
- curl_easy_setopt(http_session_, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
- curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
-}
-
-struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
- if (http_session_) {
- for (auto attribute : attributes) {
- if (matches(attribute.first, regex)) {
- std::string attr = attribute.first + ":" + attribute.second;
- headers_ = curl_slist_append(headers_, attr.c_str());
- }
- }
- }
- return headers_;
-}
-
-void HTTPClient::setContentType(std::string content_type) {
- content_type_ = "Content-Type: " + content_type;
- headers_ = curl_slist_append(headers_, content_type_.c_str());
-}
-
-std::string HTTPClient::escape(std::string string_to_escape) {
- return curl_easy_escape(http_session_, string_to_escape.c_str(), string_to_escape.length());
-}
-
-void HTTPClient::setPostFields(std::string input) {
- curl_easy_setopt(http_session_, CURLOPT_POSTFIELDSIZE, input.length());
- curl_easy_setopt(http_session_, CURLOPT_POSTFIELDS, input.c_str());
-}
-
-void HTTPClient::setHeaders(struct curl_slist *list) {
- headers_ = list;
-}
-
-void HTTPClient::appendHeader(const std::string &new_header) {
- headers_ = curl_slist_append(headers_, new_header.c_str());
-}
-
-void HTTPClient::setUseChunkedEncoding() {
- headers_ = curl_slist_append(headers_, "Transfer-Encoding: chunked");
-}
-
-bool HTTPClient::submit() {
- if (connect_timeout_ > 0) {
- curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT, connect_timeout_);
- }
-
- if (headers_ != nullptr) {
- headers_ = curl_slist_append(headers_, "Expect:");
- curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
- }
-
- curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
- logger_->log_info("Submitting to %s", url_);
- curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
- curl_easy_setopt(http_session_, CURLOPT_WRITEDATA, static_cast<void*>(&content_));
-
- curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
- curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast<void*>(&header_response_));
-
- res = curl_easy_perform(http_session_);
- curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
- curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type);
- if (res != CURLE_OK) {
- logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
- return false;
- }
- return true;
-}
-
-CURLcode HTTPClient::getResponseResult() {
- return res;
-}
-
-int64_t &HTTPClient::getResponseCode() {
- return http_code;
-}
-
-const char *HTTPClient::getContentType() {
- return content_type;
-}
-
-const std::vector<char> &HTTPClient::getResponseBody() {
- return content_.data;
-}
-
-void HTTPClient::set_request_method(const std::string method) {
- std::string my_method = method;
- std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
- if (my_method == "POST") {
- curl_easy_setopt(http_session_, CURLOPT_POST, 1L);
- } else if (my_method == "PUT") {
- curl_easy_setopt(http_session_, CURLOPT_PUT, 0L);
- } else if (my_method == "GET") {
- } else {
- curl_easy_setopt(http_session_, CURLOPT_CUSTOMREQUEST, my_method.c_str());
- }
-}
-
-bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
- if (sregex == ".*")
- return true;
-
- regex_t regex;
- int ret = regcomp(®ex, sregex.c_str(), 0);
- if (ret)
- return false;
- ret = regexec(®ex, value.c_str(), (size_t) 0, NULL, 0);
- regfree(®ex);
- if (ret)
- return false;
-
- return true;
-}
-
-void HTTPClient::configure_secure_connection(CURL *http_session) {
- logger_->log_debug("Using certificate file %s", ssl_context_service_->getCertificateFile());
- curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
- curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
-}
-
-bool HTTPClient::isSecure(const std::string &url) {
- if (url.find("https") != std::string::npos) {
- logger_->log_debug("%s is a secure url", url);
- return true;
- }
- return false;
-}
-
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 8e16122..71e8cda 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include <vector>
+#include "utils/ByteArrayCallback.h"
#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
@@ -41,7 +42,6 @@
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
-#include "utils/ByteInputCallBack.h"
namespace org {
namespace apache {
@@ -140,7 +140,7 @@ void InvokeHTTP::initialize() {
setSupportedRelationships(relationships);
}
-void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (!context->getProperty(Method.getName(), method_)) {
logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str());
return;
@@ -240,7 +240,7 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) {
return ("POST" == method || "PUT" == method || "PATCH" == method);
}
-void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_info("onTrigger InvokeHTTP with %s to %s", method_, url_);
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
@@ -352,7 +352,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
}
}
-void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess,
+void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::ProcessContext> &context, bool isSuccess,
int statusCode) {
// check if we should yield the processor
if (!isSuccess && request == nullptr) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 2a00cef..1aa3faa 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -23,6 +23,7 @@
#include <regex>
#include <curl/curl.h>
+#include "utils/ByteArrayCallback.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
@@ -30,7 +31,6 @@
#include "core/Property.h"
#include "core/Resource.h"
#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "../client/HTTPClient.h"
@@ -104,9 +104,9 @@ class InvokeHTTP : public core::Processor {
static core::Relationship RelNoRetry;
static core::Relationship RelFailure;
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- virtual void initialize();
- virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+ virtual void initialize() override;
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
/**
* Provides a reference to the URL.
*/
@@ -131,7 +131,7 @@ class InvokeHTTP : public core::Processor {
* @param isSuccess success code or not
* @param statuscode http response code.
*/
- void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode);
+ void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int statusCode);
/**
* Determine if we should emit a new flowfile based on our activity
* @param method method type
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h
index ed62d9c..4767e77 100644
--- a/extensions/http-curl/protocols/RESTProtocol.h
+++ b/extensions/http-curl/protocols/RESTProtocol.h
@@ -22,11 +22,12 @@
#include "json/writer.h"
#include <string>
#include <mutex>
+
+#include "utils/ByteArrayCallback.h"
#include "CivetServer.h"
#include "c2/C2Protocol.h"
#include "c2/HeartBeatReporter.h"
#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
#include "utils/HTTPClient.h"
namespace org {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
index f4e4a3c..77cfc90 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -51,8 +51,8 @@ class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
public:
RESTReceiver(std::string name, uuid_t uuid = nullptr);
- void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
- virtual int16_t heartbeat(const C2Payload &heartbeat);
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+ virtual int16_t heartbeat(const C2Payload &heartbeat) override;
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index e67661c..9959814 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -98,42 +98,38 @@ void RESTSender::update(const std::shared_ptr<Configure> &configure) {
}
const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
- if (!url.empty()) {
- utils::HTTPClient client(url, ssl_context_service_);
- client.setConnectionTimeout(2);
-
- std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
- if (direction == Direction::TRANSMIT) {
- input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
- callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
- input->write(outputConfig);
- callback->ptr = input.get();
- callback->pos = 0;
- client.set_request_method("POST");
- client.setUploadCallback(callback.get());
- } else {
- // we do not need to set the uplaod callback
- // since we are not uploading anything on a get
- client.set_request_method("GET");
- }
- client.setContentType("application/json");
- bool isOkay = client.submit();
- int64_t respCode = client.getResponseCode();
+ utils::HTTPClient client(url, ssl_context_service_);
+ client.setConnectionTimeout(2);
+
+ std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
+ std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+ if (direction == Direction::TRANSMIT) {
+ input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+ callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+ input->write(outputConfig);
+ callback->ptr = input.get();
+ callback->pos = 0;
+ client.set_request_method("POST");
+ client.setUploadCallback(callback.get());
+ } else {
+ // we do not need to set the uplaod callback
+ // since we are not uploading anything on a get
+ client.set_request_method("GET");
+ }
+ client.setContentType("application/json");
+ bool isOkay = client.submit();
+ int64_t respCode = client.getResponseCode();
- if (isOkay && respCode) {
- if (payload.isRaw()) {
- C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
+ if (isOkay && respCode) {
+ if (payload.isRaw()) {
+ C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
- response_payload.setRawData(client.getResponseBody());
- return std::move(response_payload);
- }
- return parseJsonResponse(payload, client.getResponseBody());
- } else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ response_payload.setRawData(client.getResponseBody());
+ return std::move(response_payload);
}
+ return parseJsonResponse(payload, client.getResponseBody());
} else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 09a1a4d..e4c1e5e 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -22,12 +22,13 @@
#include "json/writer.h"
#include <string>
#include <mutex>
+
+#include "utils/ByteArrayCallback.h"
#include "CivetServer.h"
#include "c2/C2Protocol.h"
#include "RESTProtocol.h"
#include "c2/HeartBeatReporter.h"
#include "controllers/SSLContextService.h"
-#include "utils/ByteInputCallBack.h"
#include "../client/HTTPClient.h"
namespace org {
@@ -49,13 +50,13 @@ class RESTSender : public RESTProtocol, public C2Protocol {
explicit RESTSender(std::string name, uuid_t uuid = nullptr);
- virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async);
+ virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
- virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async);
+ virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
- virtual void update(const std::shared_ptr<Configure> &configure);
+ virtual void update(const std::shared_ptr<Configure> &configure) override;
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure);
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/extensions/http-curl/sitetosite/HTTPProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
new file mode 100644
index 0000000..940d3e3
--- /dev/null
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -0,0 +1,310 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPProtocol.h"
+
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <random>
+#include <iostream>
+#include <vector>
+
+#include "PeersEntity.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr<utils::IdGenerator> HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+const std::string HttpSiteToSiteClient::parseTransactionId(const std::string &uri) {
+ int i = 0;
+ for (i = uri.length() - 1; i >= 0; i--) {
+ if (uri.at(i) == '/')
+ break;
+ }
+ return uri.substr(i + 1, uri.length() - (i + 1));
+}
+
+std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string &transactionID, TransferDirection direction) {
+ std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
+ std::stringstream uri;
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions";
+ auto client = create_http_client(uri.str(), "POST");
+
+ client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+ client->setConnectionTimeout(5);
+
+ client->setContentType("application/json");
+ client->appendHeader("Accept: application/json");
+ client->setUseChunkedEncoding();
+ client->setPostFields("");
+ client->submit();
+ peer_->setStream(nullptr);
+ if (client->getResponseCode() == 201) {
+ // parse the headers
+ auto headers = client->getParsedHeaders();
+ auto intent_name = headers["x-location-uri-intent"];
+ if (intent_name == "transaction-url") {
+ auto url = headers["Location"];
+
+ if (IsNullOrEmpty(&url)) {
+ logger_->log_debug("Location is empty");
+ } else {
+
+ org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(peer_.get());
+ auto transaction = std::make_shared<HttpTransaction>(direction, crcstream);
+ transaction->initialize(this, url);
+ auto transactionId = parseTransactionId(url);
+ if (IsNullOrEmpty(transactionId))
+ return nullptr;
+ transaction->setTransactionId(transactionId);
+ std::shared_ptr<minifi::utils::HTTPClient> client;
+ if (transaction->getDirection() == SEND) {
+ client = openConnectionForSending(transaction);
+ } else {
+ client = openConnectionForReceive(transaction);
+ transaction->setDataAvailable(true);
+ // a 201 tells us that data is available. A 200 would mean that nothing is available.
+ }
+
+ client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+ peer_->setStream(std::unique_ptr<io::DataStream>(new io::HttpStream(client)));
+ transactionID = transaction->getUUIDStr();
+ logger_->log_debug("Created transaction id -%s-", transactionID);
+ known_transactions_[transaction->getUUIDStr()] = transaction;
+ return transaction;
+ }
+ } else {
+ logger_->log_debug("Could not create transaction, intent is %s", intent_name);
+ }
+ } else {
+ logger_->log_debug("Could not create transaction, received %d", client->getResponseCode());
+ }
+ return nullptr;
+}
+
+int HttpSiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
+ if (current_code == FINISH_TRANSACTION) {
+
+ if (transaction->getDirection() == SEND) {
+ auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+ stream->closeStream();
+ auto client = stream->getClient();
+ if (client->getResponseCode() == 202) {
+ code = CONFIRM_TRANSACTION;
+ message = std::string(client->getResponseBody().data(), client->getResponseBody().size());
+ } else {
+ code = UNRECOGNIZED_RESPONSE_CODE;
+ }
+ return 1;
+ } else {
+ return 1;
+ }
+ } else if (transaction->getDirection() == RECEIVE) {
+ if (transaction->getState() == TRANSACTION_STARTED || transaction->getState() == DATA_EXCHANGED) {
+
+ if (current_code == CONFIRM_TRANSACTION && transaction->getState() == DATA_EXCHANGED) {
+ auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+ if (!stream->isFinished()) {
+ logger_->log_info("confirm read for %s, but not finished ", transaction->getUUIDStr());
+ }
+
+ closeTransaction(transaction->getUUIDStr());
+ code = CONFIRM_TRANSACTION;
+ } else {
+ auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
+ if (stream->isFinished()) {
+ code = FINISH_TRANSACTION;
+ current_code = FINISH_TRANSACTION;
+ } else {
+ code = CONTINUE_TRANSACTION;
+ }
+ }
+ } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
+ closeTransaction(transaction->getUUIDStr());
+ code = CONFIRM_TRANSACTION;
+ } else {
+
+ }
+ return 1;
+ } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
+ closeTransaction(transaction->getUUIDStr());
+ code = TRANSACTION_FINISHED;
+
+ return 1;
+ }
+ return SiteToSiteClient::readResponse(transaction, code, message);
+
+}
+// write respond
+int HttpSiteToSiteClient::writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
+ current_code = code;
+ if (code == CONFIRM_TRANSACTION || code == FINISH_TRANSACTION) {
+ return 1;
+
+ } else if (code == CONTINUE_TRANSACTION) {
+ logger_->log_info("Continuing HTTP transaction");
+ return 1;
+ }
+ return SiteToSiteClient::writeResponse(transaction, code, message);
+}
+
+bool HttpSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
+ std::stringstream uri;
+ uri << getBaseURI() << "site-to-site/peers";
+
+ auto client = create_http_client(uri.str(), "GET");
+
+ client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+ client->submit();
+
+ if (client->getResponseCode() == 200) {
+ if (sitetosite::PeersEntity::parse(logger_, std::string(client->getResponseBody().data(), client->getResponseBody().size()), port_id_, peers)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForSending(const std::shared_ptr<HttpTransaction> &transaction) {
+ std::stringstream uri;
+ uri << transaction->getTransactionUrl() << "/flow-files";
+ std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "POST"));
+ client->setContentType("application/octet-stream");
+ client->appendHeader("Accept", "text/plain");
+ client->setUseChunkedEncoding();
+ return client;
+}
+
+std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForReceive(const std::shared_ptr<HttpTransaction> &transaction) {
+ std::stringstream uri;
+ uri << transaction->getTransactionUrl() << "/flow-files";
+ std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "GET"));
+ return client;
+}
+
+//! Transfer string for the process session
+bool HttpSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
+ std::map<std::string, std::string> attributes) {
+ return false;
+}
+
+void HttpSiteToSiteClient::tearDown() {
+
+ if (peer_state_ >= ESTABLISHED) {
+ logger_->log_info("Site2Site Protocol tearDown");
+ }
+ known_transactions_.clear();
+ peer_->Close();
+ peer_state_ = IDLE;
+
+}
+
+void HttpSiteToSiteClient::closeTransaction(const std::string &transactionID) {
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return;
+ } else {
+ transaction = it->second;
+ if (transaction->closed_) {
+ return;
+ }
+ }
+
+ std::string append_str;
+ logger_->log_info("Site2Site close transaction %s", transaction->getUUIDStr().c_str());
+
+ int code = UNRECOGNIZED_RESPONSE_CODE;
+ if (transaction->getState() == TRANSACTION_CONFIRMED) {
+ code = CONFIRM_TRANSACTION;
+ } else if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION) {
+ if (transaction->_bytes > 0)
+ code = CONFIRM_TRANSACTION;
+ else
+ code = CANCEL_TRANSACTION;
+
+ } else if (transaction->current_transfers_ == 0 && !transaction->isDataAvailable()) {
+ code = CANCEL_TRANSACTION;
+ }
+
+ std::stringstream uri;
+ std::string dir_str = transaction->getDirection() == SEND ? "input-ports" : "output-ports";
+
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions/" << transactionID << "?responseCode=" << code;
+
+ if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION && transaction->_bytes > 0) {
+ uri << "&checksum=" << transaction->getCRC();
+ }
+ auto client = create_http_client(uri.str(), "DELETE");
+
+ client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+ client->setConnectionTimeout(5);
+
+ client->appendHeader("Accept", "application/json");
+
+ client->submit();
+
+ logger_->log_debug("Received %d response code from delete", client->getResponseCode());
+
+ transaction->closed_ = true;
+
+ transaction->current_transfers_--;
+}
+
+void HttpSiteToSiteClient::deleteTransaction(std::string transactionID) {
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return;
+ } else {
+ transaction = it->second;
+ }
+
+ std::string append_str;
+ logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
+
+ closeTransaction(transactionID);
+
+ known_transactions_.erase(transactionID);
+
+}
+
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */