You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/18 13:50:40 UTC

[GitHub] [nifi-minifi-cpp] bakaid opened a new pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

bakaid opened a new pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380726258
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +209,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
+    try {
 
 Review comment:
   I think this is one of the rare use cases for a function-try-block. It's minor and a stylistic question, so feel free to close, but IMO it would communicate the intent ("wrap this whole thing in `try`") better.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382672182
 
 

 ##########
 File path: extensions/http-curl/tests/unit/HTTPStreamingCallbackTests.cpp
 ##########
 @@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+#include <mutex>
+#include <vector>
+#include <string>
+#include <chrono>
+#include <cstring>
+#include <cstdint>
+
+#include "client/HTTPCallback.h"
+#include "TestBase.h"
+
+class HttpStreamingCallbackTestsFixture {
+ public:
+  HttpStreamingCallbackTestsFixture() {
+    LogTestController::getInstance().setTrace<utils::HttpStreamingCallback>();
+  }
+
+  virtual ~HttpStreamingCallbackTestsFixture() {
+    if (consumer_thread_.joinable()) {
+      consumer_thread_.join();
+    }
+    LogTestController::getInstance().reset();
+  }
+
+  void startConsumerThread() {
+    if (consumer_thread_.joinable()) {
+      throw std::logic_error("Consumer thread already started");
+    }
+    consumer_thread_ = std::thread([this](){
+      std::cerr << "Consumer thread started" << std::endl;
+
+      size_t current_pos = 0U;
 
 Review comment:
   If I understand correctly, `current_pos == content_.size()`, except between lines 62,65. Could we reduce the scope of this variable to the scope where the `memcpy` happens?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382142566
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
 
 Review comment:
   I prefer it this way (or by really acting on the assertion, like throwing an exception, but I didn't want to do this in this case).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380878311
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +209,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
+    try {
+      if (p == nullptr) {
+        return 0x10000000;
+      }
+      HTTPUploadCallback *callback = (HTTPUploadCallback *) p;
 
 Review comment:
   I agree, but this is not new code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380879464
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
+    cv.wait(lock, [&] {
+      return !byte_arrays_.empty() || !is_alive_;
+    });
+
+    if (byte_arrays_.empty()) {
+      logger_->log_trace("loadNextBuffer() ran out of buffers");
+      ptr = nullptr;
+    } else {
+      current_vec_ = std::move(byte_arrays_.front());
+      byte_arrays_.pop_front();
+
+      ptr = current_vec_.data();
+      current_buffer_start_ = total_bytes_loaded_;
+      current_pos_ = current_buffer_start_;
+      total_bytes_loaded_ += current_vec_.size();
+      logger_->log_trace("loadNextBuffer() loaded new buffer, ptr: %p, size: %zu, current_buffer_start_: %zu, current_pos_: %zu, total_bytes_loaded_: %zu",
+          ptr,
+          current_vec_.size(),
+          current_buffer_start_,
+          current_pos_,
+          total_bytes_loaded_);
+    }
   }
 
-  virtual const size_t getRemaining(size_t pos) {
-    return current_vec_.size();
-  }
+  /**
+   * Common implementation for placing a buffer into the queue
+   * @param vec the buffer to be inserted
+   * @return the number of bytes processed (the size of vec)
+   */
+  int64_t processInner(std::vector<char>&& vec) {
+    size_t size = vec.size();
 
-  virtual const size_t getBufferSize() {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    logger_->log_trace("processInner() called, vec.data(): %p, vec.size(): %zu", vec.data(), size);
 
-    if (ptr == nullptr || current_pos_ >= rolling_count_) {
-      load_buffer();
+    if (size == 0U) {
+      return 0U;
     }
-    return rolling_count_;
-  }
 
- private:
+    std::unique_lock<std::mutex> lock(mutex_);
+    byte_arrays_.emplace_back(std::move(vec));
+    cv.notify_all();
 
-  inline void load_buffer() {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
-    if (!is_alive_ && byte_arrays_.size_approx() == 0) {
-      lock.unlock();
-      return;
+    return size;
+  }
+
+  /**
+   * Seeks to the specified position
+   * @param lock unique_lock which *must* own the lock
+   * @param pos position to seek to
+   */
+  void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) {
+    logger_->log_trace("seekInner() called, current_pos_: %zu, pos: %zu", current_pos_, pos);
+    if (pos < current_pos_) {
+      const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos);
+      logger_->log_error("%s", errstr);
+      throw std::logic_error(errstr);
     }
-    try {
-      if (byte_arrays_.try_dequeue(current_vec_)) {
-        ptr = &current_vec_[0];
-        previous_pos_.store(rolling_count_.load());
-        current_pos_ = 0;
-        rolling_count_ += current_vec_.size();
-      } else {
-        ptr = nullptr;
+    while ((pos - current_buffer_start_) >= current_vec_.size()) {
+      loadNextBuffer(lock);
+      if (ptr == nullptr) {
+        break;
       }
-      lock.unlock();
-    } catch (...) {
-      lock.unlock();
     }
 
 Review comment:
   In our initial state ptr is nullptr without it actually signalling the end of the stream, so we must call  a loadNextBuffer first to give it a chance to fill our ptr, then check it for null. If we make ptr part of the while condition, this will not work.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380852335
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
+    cv.wait(lock, [&] {
+      return !byte_arrays_.empty() || !is_alive_;
+    });
+
+    if (byte_arrays_.empty()) {
+      logger_->log_trace("loadNextBuffer() ran out of buffers");
+      ptr = nullptr;
+    } else {
+      current_vec_ = std::move(byte_arrays_.front());
+      byte_arrays_.pop_front();
+
+      ptr = current_vec_.data();
+      current_buffer_start_ = total_bytes_loaded_;
+      current_pos_ = current_buffer_start_;
+      total_bytes_loaded_ += current_vec_.size();
+      logger_->log_trace("loadNextBuffer() loaded new buffer, ptr: %p, size: %zu, current_buffer_start_: %zu, current_pos_: %zu, total_bytes_loaded_: %zu",
+          ptr,
+          current_vec_.size(),
+          current_buffer_start_,
+          current_pos_,
+          total_bytes_loaded_);
+    }
   }
 
-  virtual const size_t getRemaining(size_t pos) {
-    return current_vec_.size();
-  }
+  /**
+   * Common implementation for placing a buffer into the queue
+   * @param vec the buffer to be inserted
+   * @return the number of bytes processed (the size of vec)
+   */
+  int64_t processInner(std::vector<char>&& vec) {
+    size_t size = vec.size();
 
-  virtual const size_t getBufferSize() {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    logger_->log_trace("processInner() called, vec.data(): %p, vec.size(): %zu", vec.data(), size);
 
-    if (ptr == nullptr || current_pos_ >= rolling_count_) {
-      load_buffer();
+    if (size == 0U) {
+      return 0U;
     }
-    return rolling_count_;
-  }
 
- private:
+    std::unique_lock<std::mutex> lock(mutex_);
+    byte_arrays_.emplace_back(std::move(vec));
+    cv.notify_all();
 
-  inline void load_buffer() {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
-    if (!is_alive_ && byte_arrays_.size_approx() == 0) {
-      lock.unlock();
-      return;
+    return size;
+  }
+
+  /**
+   * Seeks to the specified position
+   * @param lock unique_lock which *must* own the lock
+   * @param pos position to seek to
+   */
+  void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) {
+    logger_->log_trace("seekInner() called, current_pos_: %zu, pos: %zu", current_pos_, pos);
+    if (pos < current_pos_) {
+      const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos);
+      logger_->log_error("%s", errstr);
+      throw std::logic_error(errstr);
     }
-    try {
-      if (byte_arrays_.try_dequeue(current_vec_)) {
-        ptr = &current_vec_[0];
-        previous_pos_.store(rolling_count_.load());
-        current_pos_ = 0;
-        rolling_count_ += current_vec_.size();
-      } else {
-        ptr = nullptr;
+    while ((pos - current_buffer_start_) >= current_vec_.size()) {
+      loadNextBuffer(lock);
+      if (ptr == nullptr) {
+        break;
       }
-      lock.unlock();
-    } catch (...) {
-      lock.unlock();
     }
 
 Review comment:
   `ptr` could be part of the while condition for simpler code

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382656213
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
+    cv.wait(lock, [&] {
+      return !byte_arrays_.empty() || !is_alive_;
+    });
+
+    if (byte_arrays_.empty()) {
+      logger_->log_trace("loadNextBuffer() ran out of buffers");
+      ptr = nullptr;
+    } else {
+      current_vec_ = std::move(byte_arrays_.front());
+      byte_arrays_.pop_front();
+
+      ptr = current_vec_.data();
+      current_buffer_start_ = total_bytes_loaded_;
+      current_pos_ = current_buffer_start_;
+      total_bytes_loaded_ += current_vec_.size();
+      logger_->log_trace("loadNextBuffer() loaded new buffer, ptr: %p, size: %zu, current_buffer_start_: %zu, current_pos_: %zu, total_bytes_loaded_: %zu",
+          ptr,
+          current_vec_.size(),
+          current_buffer_start_,
+          current_pos_,
+          total_bytes_loaded_);
+    }
   }
 
-  virtual const size_t getRemaining(size_t pos) {
-    return current_vec_.size();
-  }
+  /**
+   * Common implementation for placing a buffer into the queue
+   * @param vec the buffer to be inserted
+   * @return the number of bytes processed (the size of vec)
+   */
+  int64_t processInner(std::vector<char>&& vec) {
+    size_t size = vec.size();
 
-  virtual const size_t getBufferSize() {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    logger_->log_trace("processInner() called, vec.data(): %p, vec.size(): %zu", vec.data(), size);
 
-    if (ptr == nullptr || current_pos_ >= rolling_count_) {
-      load_buffer();
+    if (size == 0U) {
+      return 0U;
     }
-    return rolling_count_;
-  }
 
- private:
+    std::unique_lock<std::mutex> lock(mutex_);
+    byte_arrays_.emplace_back(std::move(vec));
+    cv.notify_all();
 
-  inline void load_buffer() {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
-    if (!is_alive_ && byte_arrays_.size_approx() == 0) {
-      lock.unlock();
-      return;
+    return size;
+  }
+
+  /**
+   * Seeks to the specified position
+   * @param lock unique_lock which *must* own the lock
+   * @param pos position to seek to
+   */
+  void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) {
+    logger_->log_trace("seekInner() called, current_pos_: %zu, pos: %zu", current_pos_, pos);
+    if (pos < current_pos_) {
+      const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos);
+      logger_->log_error("%s", errstr);
+      throw std::logic_error(errstr);
     }
-    try {
-      if (byte_arrays_.try_dequeue(current_vec_)) {
-        ptr = &current_vec_[0];
-        previous_pos_.store(rolling_count_.load());
-        current_pos_ = 0;
-        rolling_count_ += current_vec_.size();
-      } else {
-        ptr = nullptr;
+    while ((pos - current_buffer_start_) >= current_vec_.size()) {
+      loadNextBuffer(lock);
+      if (ptr == nullptr) {
+        break;
       }
-      lock.unlock();
-    } catch (...) {
-      lock.unlock();
     }
   }
 
-  std::atomic<bool> is_alive_;
-  std::atomic<size_t> rolling_count_;
-  std::condition_variable_any cv;
-  std::atomic<size_t> previous_pos_;
-  std::atomic<size_t> current_pos_;
+  std::shared_ptr<logging::Logger> logger_;
 
-  std::recursive_mutex mutex_;
+  std::mutex mutex_;
+  std::condition_variable cv;
 
-  moodycamel::ConcurrentQueue<std::vector<char>> byte_arrays_;
+  bool is_alive_;
+  size_t total_bytes_loaded_;
+  size_t current_buffer_start_;
+  size_t current_pos_;
 
-  char *ptr;
+  std::deque<std::vector<char>> byte_arrays_;
 
   std::vector<char> current_vec_;
+  char *ptr;
 
 Review comment:
   Yeah, this was originally this way, I just reordered stuff, but I will change it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382657022
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -219,16 +232,15 @@ class HTTPRequestResponse {
         }
         if (len > size * nmemb)
           len = size * nmemb;
-        auto strr = std::string(ptr,len);
         memcpy(data, ptr, len);
         callback->pos += len;
         callback->ptr->seek(callback->getPos());
         return len;
       }
-    } else {
-      return 0x10000000;
+      return 0;
 
 Review comment:
   Umm... it's not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380881814
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
 
 Review comment:
   Normally I agree, but this class is pretty poorly designed even in this refactored state, because it had to conform to the existing interface and its current usage by HTTPStream.
   Understanding these original limitations is a vital part of understanding why this class is the way it is.
   This whole flow should be rethought and rewritten entirely - I planned to create a follow-up issue for it, and I will soon - in the interim, this spares the next people coming along from having to trace back why things are designed this way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380881814
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
 
 Review comment:
   Normally I agree, but this class is pretty poorly designed even in this refactored state, because it had to conform to the existing interface and its current usage by HTTPStream.
   Understanding these original limitations is a vital part of understanding why this class is the way it is.
   This whole flow should be rethought and rewritten entirely - I planned to create a follow-up issue for it, and I will soon - in the interim, this spares the next person coming along from having to trace back why things are designed this way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380689645
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
 
 Review comment:
   Code comments meant to document the current state of the code, not its history. Use commit messages, PR description and/or release notes for documenting the history of the class.
   
   The points (below) documenting the behavior are nice and appreciated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382647244
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +211,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
-        return 0x10000000;
+    try {
+      if (p == nullptr) {
+        return CALLBACK_ABORT;
+      }
+      HTTPUploadCallback *callback = (HTTPUploadCallback *) p;
+      if (callback->stop) {
+        return CALLBACK_ABORT;
+      }
       size_t buffer_size = callback->ptr->getBufferSize();
       if (callback->getPos() <= buffer_size) {
         size_t len = buffer_size - callback->pos;
 
 Review comment:
   What's the difference between callback->getPos() and callback->pos? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382143123
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +209,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
+    try {
 
 Review comment:
   I find the function-try-block more hard to read.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380734221
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +209,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
+    try {
+      if (p == nullptr) {
+        return 0x10000000;
+      }
+      HTTPUploadCallback *callback = (HTTPUploadCallback *) p;
 
 Review comment:
   Use `reinterpret_cast`
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#es49-if-you-must-use-a-cast-use-a-named-cast

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid closed pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid closed pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380694800
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -185,11 +185,19 @@ class HTTPRequestResponse {
    * Receive HTTP Response.
    */
   static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) {
-    HTTPReadCallback *callback = static_cast<HTTPReadCallback*>(p);
-    if (callback->stop)
+    try {
+      if (p == nullptr) {
+        return 0x10000000;
 
 Review comment:
   Could you introduce a symbolic constant for this value?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382693761
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -219,16 +232,15 @@ class HTTPRequestResponse {
         }
         if (len > size * nmemb)
           len = size * nmemb;
-        auto strr = std::string(ptr,len);
         memcpy(data, ptr, len);
         callback->pos += len;
         callback->ptr->seek(callback->getPos());
         return len;
       }
-    } else {
-      return 0x10000000;
+      return 0;
 
 Review comment:
   My bad, nevermind :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382647644
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -219,16 +232,15 @@ class HTTPRequestResponse {
         }
         if (len > size * nmemb)
           len = size * nmemb;
-        auto strr = std::string(ptr,len);
         memcpy(data, ptr, len);
         callback->pos += len;
         callback->ptr->seek(callback->getPos());
         return len;
       }
-    } else {
-      return 0x10000000;
+      return 0;
 
 Review comment:
   nitpicking: this is is overindented. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382149656
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -185,11 +185,19 @@ class HTTPRequestResponse {
    * Receive HTTP Response.
    */
   static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) {
-    HTTPReadCallback *callback = static_cast<HTTPReadCallback*>(p);
-    if (callback->stop)
+    try {
+      if (p == nullptr) {
+        return 0x10000000;
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r380849907
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
 
 Review comment:
   I usually use `assert` to document assumptions. Feel free to ignore this if you prefer it that way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382654840
 
 

 ##########
 File path: libminifi/include/utils/HTTPClient.h
 ##########
 @@ -201,15 +211,18 @@ class HTTPRequestResponse {
    */
 
   static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      HTTPUploadCallback *callback = (HTTPUploadCallback*) p;
-      if (callback->stop)
-        return 0x10000000;
+    try {
+      if (p == nullptr) {
+        return CALLBACK_ABORT;
+      }
+      HTTPUploadCallback *callback = (HTTPUploadCallback *) p;
+      if (callback->stop) {
+        return CALLBACK_ABORT;
+      }
       size_t buffer_size = callback->ptr->getBufferSize();
       if (callback->getPos() <= buffer_size) {
         size_t len = buffer_size - callback->pos;
 
 Review comment:
   getPos locks a mutex and returns pos. Then this stuff accesses pos unlocked in the next line. I didn't want to touch HTTPClient in this refactor, but yes, it is a mess.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#issuecomment-589728277
 
 
   @arpadboda @szaszm Fixed review comments, added tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r382644595
 
 

 ##########
 File path: extensions/http-curl/client/HTTPCallback.h
 ##########
 @@ -33,148 +34,195 @@ namespace minifi {
 namespace utils {
 
 /**
- * will stream as items are processed.
+ * The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
+ * This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
+ * through HTTPStream and the non-buggy part of the behaviour of the original class.
+ * Based on these:
+ *  - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
+ *    consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
+ *    body with data utilizing HTTP chunked transfer encoding
+ *  - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
+ *  - if we expect that more data will be available, but there is none available at the current time, we should block
+ *    the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
+ *    new data
+ *  - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
+ *    for data should be made on us
+ *  - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
+ *    by the current buffer
+ *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
+ *    the current buffer
  */
 class HttpStreamingCallback : public ByteInputCallBack {
  public:
   HttpStreamingCallback()
-      : is_alive_(true),
+      : logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
+        is_alive_(true),
+        total_bytes_loaded_(0U),
+        current_buffer_start_(0U),
+        current_pos_(0U),
         ptr(nullptr) {
-    previous_pos_ = 0;
-    rolling_count_ = 0;
   }
 
-  virtual ~HttpStreamingCallback() {
-
-  }
+  virtual ~HttpStreamingCallback() = default;
 
   void close() {
+    logger_->log_trace("close() called");
+    std::unique_lock<std::mutex> lock(mutex_);
     is_alive_ = false;
     cv.notify_all();
   }
 
-  virtual void seek(size_t pos) {
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+  void seek(size_t pos) override {
+    logger_->log_trace("seek(pos: %zu) called", pos);
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
-
+  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
     std::vector<char> vec;
 
     if (stream->getSize() > 0) {
       vec.resize(stream->getSize());
-
       stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize());
     }
 
-    size_t added_size = vec.size();
-
-    byte_arrays_.enqueue(std::move(vec));
-
-    cv.notify_all();
-
-    return added_size;
-
+    return processInner(std::move(vec));
   }
 
-  virtual int64_t process(uint8_t *vector, size_t size) {
-
+  virtual int64_t process(const uint8_t* data, size_t size) {
     std::vector<char> vec;
+    vec.resize(size);
+    memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
 
-    if (size > 0) {
-      vec.resize(size);
+    return processInner(std::move(vec));
+  }
 
-      memcpy(vec.data(), vector, size);
+  void write(std::string content) override {
+    std::vector<char> vec;
+    vec.assign(content.begin(), content.end());
 
-      size_t added_size = vec.size();
+    (void) processInner(std::move(vec));
+  }
 
-      byte_arrays_.enqueue(std::move(vec));
+  char* getBuffer(size_t pos) override {
+    logger_->log_trace("getBuffer(pos: %zu) called", pos);
 
-      cv.notify_all();
+    std::unique_lock<std::mutex> lock(mutex_);
 
-      return added_size;
-    } else {
-      return 0;
+    seekInner(lock, pos);
+    if (ptr == nullptr) {
+      return nullptr;
     }
 
-  }
+    size_t relative_pos = pos - current_buffer_start_;
+    current_pos_ = pos;
 
-  virtual void write(std::string content) {
-    std::vector<char> vec;
-    vec.assign(content.begin(), content.end());
-    byte_arrays_.enqueue(vec);
+    return ptr + relative_pos;
   }
 
-  virtual char *getBuffer(size_t pos) {
-
-    // if there is no space remaining in our current buffer,
-    // we should load the next. If none exists after that we have no more buffer
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+  const size_t getRemaining(size_t pos) override {
+    logger_->log_trace("getRemaining(pos: %zu) called", pos);
 
-    if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0)
-      load_buffer();
+    std::unique_lock<std::mutex> lock(mutex_);
+    seekInner(lock, pos);
+    return total_bytes_loaded_ - pos;
+  }
 
-    if (ptr == nullptr)
-      return nullptr;
+  const size_t getBufferSize() override {
+    logger_->log_trace("getBufferSize() called");
 
-    size_t absolute_position = pos - previous_pos_;
+    std::unique_lock<std::mutex> lock(mutex_);
+    // This is needed to make sure that the first buffer is loaded
+    seekInner(lock, current_pos_);
+    return total_bytes_loaded_;
+  }
 
-    current_pos_ = pos;
+ private:
 
-    return ptr + absolute_position;
+  /**
+   * Loads the next available buffer
+   * @param lock unique_lock which *must* own the lock
+   */
+  inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
+    cv.wait(lock, [&] {
+      return !byte_arrays_.empty() || !is_alive_;
+    });
+
+    if (byte_arrays_.empty()) {
+      logger_->log_trace("loadNextBuffer() ran out of buffers");
+      ptr = nullptr;
+    } else {
+      current_vec_ = std::move(byte_arrays_.front());
+      byte_arrays_.pop_front();
+
+      ptr = current_vec_.data();
+      current_buffer_start_ = total_bytes_loaded_;
+      current_pos_ = current_buffer_start_;
+      total_bytes_loaded_ += current_vec_.size();
+      logger_->log_trace("loadNextBuffer() loaded new buffer, ptr: %p, size: %zu, current_buffer_start_: %zu, current_pos_: %zu, total_bytes_loaded_: %zu",
+          ptr,
+          current_vec_.size(),
+          current_buffer_start_,
+          current_pos_,
+          total_bytes_loaded_);
+    }
   }
 
-  virtual const size_t getRemaining(size_t pos) {
-    return current_vec_.size();
-  }
+  /**
+   * Common implementation for placing a buffer into the queue
+   * @param vec the buffer to be inserted
+   * @return the number of bytes processed (the size of vec)
+   */
+  int64_t processInner(std::vector<char>&& vec) {
+    size_t size = vec.size();
 
-  virtual const size_t getBufferSize() {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    logger_->log_trace("processInner() called, vec.data(): %p, vec.size(): %zu", vec.data(), size);
 
-    if (ptr == nullptr || current_pos_ >= rolling_count_) {
-      load_buffer();
+    if (size == 0U) {
+      return 0U;
     }
-    return rolling_count_;
-  }
 
- private:
+    std::unique_lock<std::mutex> lock(mutex_);
+    byte_arrays_.emplace_back(std::move(vec));
+    cv.notify_all();
 
-  inline void load_buffer() {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;});
-    if (!is_alive_ && byte_arrays_.size_approx() == 0) {
-      lock.unlock();
-      return;
+    return size;
+  }
+
+  /**
+   * Seeks to the specified position
+   * @param lock unique_lock which *must* own the lock
+   * @param pos position to seek to
+   */
+  void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) {
+    logger_->log_trace("seekInner() called, current_pos_: %zu, pos: %zu", current_pos_, pos);
+    if (pos < current_pos_) {
+      const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos);
+      logger_->log_error("%s", errstr);
+      throw std::logic_error(errstr);
     }
-    try {
-      if (byte_arrays_.try_dequeue(current_vec_)) {
-        ptr = &current_vec_[0];
-        previous_pos_.store(rolling_count_.load());
-        current_pos_ = 0;
-        rolling_count_ += current_vec_.size();
-      } else {
-        ptr = nullptr;
+    while ((pos - current_buffer_start_) >= current_vec_.size()) {
+      loadNextBuffer(lock);
+      if (ptr == nullptr) {
+        break;
       }
-      lock.unlock();
-    } catch (...) {
-      lock.unlock();
     }
   }
 
-  std::atomic<bool> is_alive_;
-  std::atomic<size_t> rolling_count_;
-  std::condition_variable_any cv;
-  std::atomic<size_t> previous_pos_;
-  std::atomic<size_t> current_pos_;
+  std::shared_ptr<logging::Logger> logger_;
 
-  std::recursive_mutex mutex_;
+  std::mutex mutex_;
+  std::condition_variable cv;
 
-  moodycamel::ConcurrentQueue<std::vector<char>> byte_arrays_;
+  bool is_alive_;
+  size_t total_bytes_loaded_;
+  size_t current_buffer_start_;
+  size_t current_pos_;
 
-  char *ptr;
+  std::deque<std::vector<char>> byte_arrays_;
 
   std::vector<char> current_vec_;
+  char *ptr;
 
 Review comment:
   For consistency, can you rename this to end with underscore?
   I was searching the declaration of this in a function for 2 minutes :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #740: MINIFICPP-1144 - Fix HTTPCallback freeze and refactor class
URL: https://github.com/apache/nifi-minifi-cpp/pull/740#discussion_r383145068
 
 

 ##########
 File path: extensions/http-curl/tests/unit/HTTPStreamingCallbackTests.cpp
 ##########
 @@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+#include <mutex>
+#include <vector>
+#include <string>
+#include <chrono>
+#include <cstring>
+#include <cstdint>
+
+#include "client/HTTPCallback.h"
+#include "TestBase.h"
+
+class HttpStreamingCallbackTestsFixture {
+ public:
+  HttpStreamingCallbackTestsFixture() {
+    LogTestController::getInstance().setTrace<utils::HttpStreamingCallback>();
+  }
+
+  virtual ~HttpStreamingCallbackTestsFixture() {
+    if (consumer_thread_.joinable()) {
+      consumer_thread_.join();
+    }
+    LogTestController::getInstance().reset();
+  }
+
+  void startConsumerThread() {
+    if (consumer_thread_.joinable()) {
+      throw std::logic_error("Consumer thread already started");
+    }
+    consumer_thread_ = std::thread([this](){
+      std::cerr << "Consumer thread started" << std::endl;
+
+      size_t current_pos = 0U;
 
 Review comment:
    - This function was made to emulate `utils::HTTPCLient::send_write` closely, and the position there is a member variable, that's why `current_pos` became a variable in this scope
    - I couldn't possibly care less about the scope of a variable inside a lambda in a test function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services