You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/11 08:14:38 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1383: MINIFICPP-1875 HTTPClient should be reusable

lordgamez commented on code in PR #1383:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1383#discussion_r942650264


##########
libminifi/include/utils/BaseHTTPClient.h:
##########
@@ -51,38 +39,43 @@ struct HTTPProxy {
   int port = 0;
 };
 
-struct HTTPUploadCallback {

Review Comment:
   Would it may be possible to inherit from the ByteInputCallback instead of composition? It may shorten some [train wrecks](https://devcards.io/train-wreck) like `http_client_->getReadCallback()->getPtr()->getSize()`. Same for HTTPReadCallback



##########
extensions/http-curl/tests/unit/ConnectionCountingServer.h:
##########
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <array>
+#include <vector>
+#include <string>
+#include <set>
+#include "CivetServer.h"
+
+namespace org::apache::nifi::minifi::extensions::curl::testing {
+
+namespace details {
+
+class NumberedMethodResponder : public CivetHandler {
+ public:
+  explicit NumberedMethodResponder(std::set<utils::SmallString<36>>& connections) : connections_(connections) {}
+
+  bool handleGet(CivetServer*, struct mg_connection* conn) override {
+    sendNumberedMessage("GET", conn);
+    return true;
+  }
+
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+    sendNumberedMessage("POST", conn);
+    return true;
+  }
+
+  bool handlePut(CivetServer*, struct mg_connection* conn) override {
+    sendNumberedMessage("PUT", conn);
+    return true;
+  }
+
+  bool handleHead(CivetServer*, struct mg_connection* conn) override {
+    sendNumberedMessage("HEAD", conn);
+    return true;
+  }
+
+ private:
+  void sendNumberedMessage(std::string body, struct mg_connection* conn) {
+    saveConnectionId(conn);
+    body.append(std::to_string(response_id_));
+    mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+    mg_printf(conn, "Content-length: %lu\r\n", body.length());
+    mg_printf(conn, "Response-number: %" PRIu64 "\r\n", response_id_);
+    mg_printf(conn, "\r\n");
+    mg_printf(conn, body.data(), body.length());
+    ++response_id_;
+  }
+
+  void saveConnectionId(struct mg_connection* conn) {
+    auto user_connection_data = reinterpret_cast<utils::SmallString<36>*>(mg_get_user_connection_data(conn));
+    connections_.emplace(*user_connection_data);
+  }
+
+  uint64_t response_id_ = 0;
+  std::set<utils::SmallString<36>>& connections_;
+};
+
+class ReverseBodyPostHandler : public CivetHandler {
+ public:
+  explicit ReverseBodyPostHandler(std::set<utils::SmallString<36>>& connections) : connections_(connections) {}
+
+  bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
+    saveConnectionId(conn);
+    std::vector<char> request_body;
+    request_body.reserve(2048);
+    size_t read_size = mg_read(conn, request_body.data(), 2048);
+    std::string response_body{request_body.begin(), request_body.begin() + read_size};
+    std::reverse(std::begin(response_body), std::end(response_body));
+    mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+    mg_printf(conn, "Content-length: %zu\r\n", read_size);
+    mg_printf(conn, "\r\n");
+    mg_printf(conn, response_body.data(), read_size);
+
+    return true;
+  }
+
+ private:
+  void saveConnectionId(struct mg_connection* conn) {
+    auto user_connection_data = reinterpret_cast<utils::SmallString<36>*>(mg_get_user_connection_data(conn));
+    connections_.emplace(*user_connection_data);
+  }
+
+  std::set<utils::SmallString<36>>& connections_;
+};
+
+struct AddIdToUserConnectionData : public CivetCallbacks {
+  AddIdToUserConnectionData() {
+    init_connection = [](const struct mg_connection*, void** user_connection_data) -> int {
+      utils::SmallString<36>* id = new utils::SmallString<36>(utils::IdGenerator::getIdGenerator()->generate().to_string());
+      *user_connection_data = reinterpret_cast<void*>(id);
+      return 0;
+    };
+
+    connection_close = [](const struct mg_connection* conn) -> void {
+      auto user_connection_data = reinterpret_cast<utils::SmallString<36>*>(mg_get_user_connection_data(conn));
+      delete user_connection_data;
+    };

Review Comment:
   These lambdas seem to be unused, am I missing something here?



##########
extensions/http-curl/client/HTTPStream.cpp:
##########
@@ -26,55 +26,56 @@
 #include "io/validation.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
+namespace org::apache::nifi::minifi::extensions::curl {
 
-HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client)
+HttpStream::HttpStream(std::shared_ptr<HTTPClient> client)
     : http_client_(std::move(client)),
       written(0),
-      // given the nature of the stream we don't want to slow libCURL, we will produce
-      // a warning instead allowing us to adjust it server side or through the local configuration.
-      http_read_callback_(66560, true),
+    // given the nature of the stream we don't want to slow libCURL, we will produce
+    // a warning instead allowing us to adjust it server side or through the local configuration.
       started_(false) {
   // submit early on
 }
 
 void HttpStream::close() {
-  http_callback_.close();
-  http_read_callback_.close();
+  if (auto read_callback = http_client_->getReadCallback())
+    read_callback->getPtr()->close();
+  if (auto upload_callback = http_client_->getUploadCallback())
+    upload_callback->getPtr()->close();
 }
 
 void HttpStream::seek(size_t /*offset*/) {
   // seek is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
 
-size_t HttpStream::tell() const  {
+size_t HttpStream::tell() const {
   // tell is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::tell is unimplemented"};
 }
 
 // data stream overrides
 
-size_t HttpStream::write(const uint8_t *value, size_t size) {
+size_t HttpStream::write(const uint8_t* value, size_t size) {
   if (size == 0) return 0;
   if (IsNullOrEmpty(value)) {
-    return STREAM_ERROR;
+    return io::STREAM_ERROR;
   }
   if (!started_) {
     std::lock_guard<std::mutex> lock(mutex_);
     if (!started_) {
-      callback_.ptr = &http_callback_;
-      callback_.pos = 0;
-      http_client_->setUploadCallback(&callback_);
+      auto callback = std::make_unique<utils::HTTPUploadCallback>(new HttpStreamingCallback());
+      callback->pos = 0;
+      http_client_->setUploadCallback(std::move(callback));
       http_client_future_ = std::async(std::launch::async, submit_client, http_client_);
       started_ = true;
     }
   }
-  http_callback_.process(value, size);
+  auto http_callback = dynamic_cast<HttpStreamingCallback*>(gsl::as_nullable(http_client_->getUploadCallback()->getPtr()));
+  if (http_callback)

Review Comment:
   These could be merged



##########
libminifi/include/utils/BaseHTTPClient.h:
##########
@@ -51,38 +39,43 @@ struct HTTPProxy {
   int port = 0;
 };
 
-struct HTTPUploadCallback {
-  HTTPUploadCallback() {
-    stop = false;
-    ptr = nullptr;
-    pos = 0;
-  }
-  std::mutex mutex;
-  std::atomic<bool> stop;
-  ByteInputCallback *ptr;
-  size_t pos;
+class HTTPUploadCallback {
+ public:
+  explicit HTTPUploadCallback(ByteInputCallback* byte_input_callback) : ptr(std::move(byte_input_callback)) {}
 
   size_t getPos() {
     std::lock_guard<std::mutex> lock(mutex);
     return pos;

Review Comment:
   Could the mutex be removed and `atomic<size_t> pos;` used instead? Same for HTTPReadCallback



##########
extensions/http-curl/client/HTTPStream.cpp:
##########
@@ -26,55 +26,56 @@
 #include "io/validation.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
+namespace org::apache::nifi::minifi::extensions::curl {
 
-HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client)
+HttpStream::HttpStream(std::shared_ptr<HTTPClient> client)
     : http_client_(std::move(client)),
       written(0),
-      // given the nature of the stream we don't want to slow libCURL, we will produce
-      // a warning instead allowing us to adjust it server side or through the local configuration.
-      http_read_callback_(66560, true),
+    // given the nature of the stream we don't want to slow libCURL, we will produce
+    // a warning instead allowing us to adjust it server side or through the local configuration.
       started_(false) {
   // submit early on
 }
 
 void HttpStream::close() {
-  http_callback_.close();
-  http_read_callback_.close();
+  if (auto read_callback = http_client_->getReadCallback())
+    read_callback->getPtr()->close();
+  if (auto upload_callback = http_client_->getUploadCallback())
+    upload_callback->getPtr()->close();
 }
 
 void HttpStream::seek(size_t /*offset*/) {
   // seek is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
 
-size_t HttpStream::tell() const  {
+size_t HttpStream::tell() const {
   // tell is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::tell is unimplemented"};
 }
 
 // data stream overrides
 
-size_t HttpStream::write(const uint8_t *value, size_t size) {
+size_t HttpStream::write(const uint8_t* value, size_t size) {
   if (size == 0) return 0;
   if (IsNullOrEmpty(value)) {
-    return STREAM_ERROR;
+    return io::STREAM_ERROR;
   }
   if (!started_) {
     std::lock_guard<std::mutex> lock(mutex_);
     if (!started_) {
-      callback_.ptr = &http_callback_;
-      callback_.pos = 0;
-      http_client_->setUploadCallback(&callback_);
+      auto callback = std::make_unique<utils::HTTPUploadCallback>(new HttpStreamingCallback());

Review Comment:
   This could be simplified to `auto callback = std::make_unique<utils::HttpStreamingCallback>();`



##########
docker/test/integration/MiNiFi_integration_test_driver.py:
##########
@@ -132,7 +132,9 @@ def generate_input_port_for_remote_process_group(remote_process_group, name):
         input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), "input_port"))
         return input_port_node
 
-    def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
+    def add_test_data(self, path, test_data, file_name=None):
+        if file_name is None:
+            file_name = str(uuid.uuid4())

Review Comment:
   What's the difference with this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org