You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/19 16:18:46 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1225 - Fix readBuffer not reporting errors, handle asserts during CivetServer shutdown

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b7fdeb4  MINIFICPP-1225 - Fix readBuffer not reporting errors, handle asserts during CivetServer shutdown
b7fdeb4 is described below

commit b7fdeb450d42aa85c69529c52fc453feba5a30bf
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Fri May 15 16:11:52 2020 +0200

    MINIFICPP-1225 - Fix readBuffer not reporting errors, handle asserts during CivetServer shutdown
    
    MINIFICPP-1225 - Possible fix for ListenHTTP processor, formatting fixes
    
    MINIFICPP-1225 - Add missing override specifiers.
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #786
---
 extensions/civetweb/processors/ListenHTTP.cpp      |  5 +++
 extensions/civetweb/processors/ListenHTTP.h        |  8 ++--
 extensions/http-curl/client/HTTPStream.cpp         |  6 +--
 extensions/http-curl/client/HTTPStream.h           |  8 ++--
 extensions/http-curl/tests/CivetStream.h           | 12 +++---
 extensions/http-curl/tests/HTTPHandlers.h          | 46 ++++++++++++++--------
 extensions/http-curl/tests/HTTPIntegrationBase.h   | 10 +++--
 extensions/http-curl/tests/ServerAwareHandler.h    | 36 +++++++++++++++++
 extensions/http-curl/tests/SiteToSiteRestTest.cpp  |  2 +-
 .../http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp |  4 +-
 .../http-curl/tests/VerifyInvokeHTTPTest.cpp       |  2 +-
 extensions/rocksdb-repos/RocksDbStream.cpp         |  6 +--
 extensions/rocksdb-repos/RocksDbStream.h           |  8 ++--
 libminifi/include/io/CRCStream.h                   | 27 ++++++++++---
 libminifi/include/io/ClientSocket.h                | 11 +++++-
 libminifi/include/io/DescriptorStream.h            | 11 +++++-
 libminifi/include/io/FileStream.h                  | 11 +++++-
 libminifi/include/io/tls/SecureDescriptorStream.h  | 11 +++++-
 libminifi/src/io/ClientSocket.cpp                  | 21 +++++++---
 libminifi/src/io/DescriptorStream.cpp              | 21 +++++++---
 libminifi/src/io/FileStream.cpp                    |  9 ++++-
 libminifi/src/io/tls/SecureDescriptorStream.cpp    | 21 +++++++---
 22 files changed, 220 insertions(+), 76 deletions(-)

diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 193b7d4..b9ab508 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -487,6 +487,11 @@ std::string ListenHTTP::getPort() const {
   return listeningPort;
 }
 
+void ListenHTTP::notifyStop() {
+  server_.reset();
+  handler_.reset();
+}
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index e949fad..847a536 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -69,9 +69,9 @@ class ListenHTTP : public core::Processor {
   // Supported Relationships
   static core::Relationship Success;
 
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-  void initialize();
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+  void initialize() override;
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
   std::string getPort() const;
   bool isSecure() const;
 
@@ -201,6 +201,8 @@ class ListenHTTP : public core::Processor {
     }
     return 0;
   }
+ protected:
+  void notifyStop() override;
 
  private:
   // Logger
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index cbcd3d8..e3cc31f 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -87,11 +87,9 @@ int HttpStream::writeData(uint8_t *value, int size) {
 }
 
 template<typename T>
-inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) {
-  std::vector<uint8_t> buf;
+inline int HttpStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
   buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
-  return buf;
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
 }
 
 int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) {
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index 7e5a6c2..ce6796b 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -155,13 +155,13 @@ class HttpStream : public io::BaseStream {
  protected:
 
   /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
    * @param t incoming object
-   * @returns vector.
+   * @returns number of bytes read.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  int readBuffer(std::vector<uint8_t>&, const T&);
 
   void reset();
 
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 2e58fd9..4503ec3 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -108,17 +108,15 @@ class CivetStream : public io::BaseStream {
  protected:
 
   /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
    * @param t incoming object
-   * @returns vector.
+   * @returns number of bytes read
    */
   template<typename T>
-  inline std::vector<uint8_t> readBuffer(const T& t) {
-    std::vector<uint8_t> buf;
+  inline int readBuffer(std::vector<uint8_t>& buf, const T& t) {
     buf.resize(sizeof t);
-    readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
-    return buf;
+    return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
   }
 
   //size_t pos;
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 0a36387..eda7a0e 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -27,6 +27,7 @@
 #include <cinttypes>
 #include <utility>
 #include "HTTPUtils.h"
+#include "ServerAwareHandler.h"
 
 static std::atomic<int> transaction_id;
 static std::atomic<int> transaction_id_output;
@@ -45,7 +46,7 @@ struct FlowObj {
   std::vector<uint8_t> data;
 };
 
-class SiteToSiteLocationResponder : public CivetHandler {
+class SiteToSiteLocationResponder : public ServerAwareHandler {
  public:
   explicit SiteToSiteLocationResponder(bool isSecure)
       : isSecure(isSecure) {
@@ -72,7 +73,7 @@ class SiteToSiteLocationResponder : public CivetHandler {
   bool isSecure;
 };
 
-class PeerResponder : public CivetHandler {
+class PeerResponder : public ServerAwareHandler {
  public:
 
   explicit PeerResponder(std::string base_url) {
@@ -101,7 +102,7 @@ class PeerResponder : public CivetHandler {
   std::string path;
 };
 
-class SiteToSiteBaseResponder : public CivetHandler {
+class SiteToSiteBaseResponder : public ServerAwareHandler {
  public:
 
   explicit SiteToSiteBaseResponder(std::string base_url)
@@ -122,7 +123,7 @@ class SiteToSiteBaseResponder : public CivetHandler {
   std::string base_url;
 };
 
-class TransactionResponder : public CivetHandler {
+class TransactionResponder : public ServerAwareHandler {
  public:
 
   explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri = false, bool empty_transaction_uri = false)
@@ -184,7 +185,7 @@ class TransactionResponder : public CivetHandler {
   moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
 };
 
-class FlowFileResponder : public CivetHandler {
+class FlowFileResponder : public ServerAwareHandler {
  public:
 
   explicit FlowFileResponder(bool input_port, bool wrong_uri = false, bool invalid_checksum = false)
@@ -210,25 +211,36 @@ class FlowFileResponder : public CivetHandler {
       minifi::io::CivetStream civet_stream(conn);
       minifi::io::CRCStream < minifi::io::CivetStream > stream(&civet_stream);
       uint32_t num_attributes;
+      int read;
       uint64_t total_size = 0;
-      total_size += stream.read(num_attributes);
+      read = stream.read(num_attributes);
+      if(!isServerRunning())return false;
+      assert(read > 0); total_size += read;
 
       auto flow = std::make_shared<FlowObj>();
 
       for (int i = 0; i < num_attributes; i++) {
         std::string name, value;
-        total_size += stream.readUTF(name, true);
-        total_size += stream.readUTF(value, true);
+        read = stream.readUTF(name, true);
+        if(!isServerRunning())return false;
+        assert(read > 0); total_size += read;
+        read = stream.readUTF(value, true);
+        if(!isServerRunning())return false;
+        assert(read > 0); total_size += read;
         flow->attributes[name] = value;
       }
       uint64_t length;
-      total_size += stream.read(length);
+      read = stream.read(length);
+      if(!isServerRunning())return false;
+      assert(read > 0); total_size += read;
 
       total_size += length;
       flow->data.resize(length);
       flow->total_size = total_size;
 
-      assert(stream.readData(flow->data.data(), length) == length);
+      read = stream.readData(flow->data.data(), length);
+      if(!isServerRunning())return false;
+      assert(read == length);
 
       assert(flow->attributes["path"] == ".");
       assert(!flow->attributes["uuid"].empty());
@@ -309,7 +321,7 @@ class FlowFileResponder : public CivetHandler {
   moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
 };
 
-class DeleteTransactionResponder : public CivetHandler {
+class DeleteTransactionResponder : public ServerAwareHandler {
  public:
 
   explicit DeleteTransactionResponder(std::string base_url, std::string response_code, int expected_resp_code)
@@ -348,7 +360,7 @@ class DeleteTransactionResponder : public CivetHandler {
   std::string response_code;
 };
 
-class HeartbeatHandler : public CivetHandler {
+class HeartbeatHandler : public ServerAwareHandler {
  public:
   explicit HeartbeatHandler(bool isSecure)
       : isSecure(isSecure) {
@@ -448,10 +460,10 @@ class HeartbeatHandler : public CivetHandler {
   bool isSecure;
 };
 
-class InvokeHTTPCouldNotConnectHandler : public CivetHandler {
+class InvokeHTTPCouldNotConnectHandler : public ServerAwareHandler {
 };
 
-class InvokeHTTPResponseOKHandler : public CivetHandler {
+class InvokeHTTPResponseOKHandler : public ServerAwareHandler {
 public:
   bool handlePost(CivetServer *, struct mg_connection *conn) {
     mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -459,7 +471,7 @@ public:
   }
 };
 
-class InvokeHTTPResponse404Handler : public CivetHandler {
+class InvokeHTTPResponse404Handler : public ServerAwareHandler {
 public:
   bool handlePost(CivetServer *, struct mg_connection *conn) {
     mg_printf(conn, "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -467,7 +479,7 @@ public:
   }
 };
 
-class InvokeHTTPResponse501Handler : public CivetHandler {
+class InvokeHTTPResponse501Handler : public ServerAwareHandler {
 public:
   bool handlePost(CivetServer *, struct mg_connection *conn) {
     mg_printf(conn, "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -475,7 +487,7 @@ public:
   }
 };
 
-class TimeoutingHTTPHandler : public CivetHandler {
+class TimeoutingHTTPHandler : public ServerAwareHandler {
 public:
   TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
       : wait_(wait_ms) {
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 7c9960e..776c647 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -23,6 +23,7 @@
 #include "integration/IntegrationBase.h"
 #include "c2/C2Agent.h"
 #include "protocols/RESTSender.h"
+#include "ServerAwareHandler.h"
 
 int log_message(const struct mg_connection *conn, const char *message) {
   puts(message);
@@ -40,9 +41,10 @@ class CoapIntegrationBase : public IntegrationBase {
         server(nullptr) {
   }
 
-  void setUrl(const std::string& url, CivetHandler *handler);
+  void setUrl(const std::string& url, ServerAwareHandler *handler);
 
   void shutdownBeforeFlowController() override {
+    is_server_running = false;
     stop_webserver(server);
   }
 
@@ -55,17 +57,19 @@ class CoapIntegrationBase : public IntegrationBase {
   }
 
  protected:
+  std::atomic_bool is_server_running;
   CivetServer *server;
 };
 
-void CoapIntegrationBase::setUrl(const std::string& url, CivetHandler *handler) {
-
+void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *handler) {
+  handler->initServerFlag(is_server_running);
   parse_http_components(url, port, scheme, path);
   struct mg_callbacks callback{};
   if (server != nullptr) {
     server->addHandler(path, handler);
     return;
   }
+  is_server_running = true;
   if (scheme == "https" && !key_dir.empty()) {
     std::string cert = "";
     cert = key_dir + "nifi-cert.pem";
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h b/extensions/http-curl/tests/ServerAwareHandler.h
new file mode 100644
index 0000000..63ee0ee
--- /dev/null
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
+#define NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
+
+class ServerAwareHandler: public CivetHandler{
+protected:
+  const std::atomic_bool *is_server_running{nullptr};
+  bool isServerRunning(){
+    assert(is_server_running);
+    return *is_server_running;
+  }
+public:
+  void initServerFlag(std::atomic_bool& is_running){
+    assert(is_server_running == nullptr);
+    is_server_running = &is_running;
+  }
+};
+
+#endif //NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 036b949..1d6deaf 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -49,7 +49,7 @@
 #include "../tests/TestServer.h"
 #include "HTTPIntegrationBase.h"
 
-class Responder : public CivetHandler {
+class Responder : public ServerAwareHandler {
  public:
   explicit Responder(bool isSecure)
       : isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index 41619a8..91a9a77 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -91,8 +91,8 @@ protected:
 };
 
 struct defaulted_handler{
-  CivetHandler* handler = nullptr;
-  CivetHandler* get(CivetHandler *def) const {
+  ServerAwareHandler* handler = nullptr;
+  ServerAwareHandler* get(ServerAwareHandler *def) const {
     if(handler)return handler;
     return def;
   }
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index bbc1ce2..7d31165 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -141,7 +141,7 @@ void run(VerifyInvokeHTTP& harness,
     const std::string& url,
     const std::string& test_file_location,
     const std::string& key_dir,
-    CivetHandler * handler) {
+    ServerAwareHandler * handler) {
 
   harness.setKeyDir(key_dir);
   harness.setUrl(url, handler);
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 2a2cd95..7bcc2e1 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -80,11 +80,9 @@ int RocksDbStream::writeData(uint8_t *value, int size) {
 }
 
 template<typename T>
-inline std::vector<uint8_t> RocksDbStream::readBuffer(const T& t) {
-  std::vector<uint8_t> buf;
+inline int RocksDbStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
   buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
-  return buf;
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
 }
 
 int RocksDbStream::readData(std::vector<uint8_t> &buf, int buflen) {
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 924ee0c..6bec50f 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -142,13 +142,13 @@ class RocksDbStream : public io::BaseStream {
  protected:
 
   /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
    * @param t incoming object
-   * @returns vector.
+   * @returns number of bytes read.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  int readBuffer(std::vector<uint8_t>&, const T&);
 
   std::string path_;
 
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 6b63f95..6ab131f 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -153,11 +153,22 @@ class CRCStream : public BaseStream {
   template<typename K>
   std::vector<uint8_t> readBuffer(const K& t) {
     std::vector<uint8_t> buf;
-    buf.resize(sizeof t);
-    readData((uint8_t*) &buf[0], sizeof(t));
+    readBuffer(buf, t);
     return buf;
   }
 
+  /**
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
+   * @param t incoming object
+   * @returns number of bytes read.
+   */
+  template<typename K>
+  int readBuffer(std::vector<uint8_t>& buf, const K& t) {
+    buf.resize(sizeof t);
+    return readData((uint8_t*) &buf[0], sizeof(t));
+  }
+
   uint64_t crc_;
   T *child_stream_;
   bool disable_encoding_;
@@ -259,7 +270,9 @@ template<typename T>
 int CRCStream<T>::read(uint64_t &value, bool is_little_endian) {
   if (disable_encoding_)
     is_little_endian = false;
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
@@ -275,7 +288,9 @@ template<typename T>
 int CRCStream<T>::read(uint32_t &value, bool is_little_endian) {
   if (disable_encoding_)
     is_little_endian = false;
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
@@ -290,7 +305,9 @@ template<typename T>
 int CRCStream<T>::read(uint16_t &value, bool is_little_endian) {
   if (disable_encoding_)
     is_little_endian = false;
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = (buf[0] << 8) | buf[1];
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index b1ca262..c209ee3 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -249,7 +249,16 @@ class Socket : public BaseStream {
    * @returns vector.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  std::vector<uint8_t> readBuffer(const T& t);
+
+  /**
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
+   * @param t incoming object
+   * @returns number of bytes read.
+   */
+  template<typename T>
+  int readBuffer(std::vector<uint8_t>& buf, const T& t);
 
   /**
    * Creates a connection using the addr object
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index 8389c08..2b131a5 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -163,7 +163,16 @@ class DescriptorStream : public io::BaseStream {
    * @returns vector.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  std::vector<uint8_t> readBuffer(const T& t);
+
+  /**
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
+   * @param t incoming object
+   * @returns number of bytes read.
+   */
+  template<typename T>
+  int readBuffer(std::vector<uint8_t>& buf, const T& t);
   std::recursive_mutex file_lock_;
 
   int fd_;
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 52675d6..8b99564 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -113,7 +113,16 @@ class FileStream : public io::BaseStream {
    * @returns vector.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  std::vector<uint8_t> readBuffer(const T& t);
+
+  /**
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
+   * @param t incoming object
+   * @returns number of bytes read.
+   */
+  template<typename T>
+  int readBuffer(std::vector<uint8_t>& buf, const T& t);
   std::recursive_mutex file_lock_;
   std::unique_ptr<std::fstream> file_stream_;
   size_t offset_;
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 2528323..cabf0eb 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -165,7 +165,16 @@ class SecureDescriptorStream : public io::BaseStream {
    * @returns vector.
    */
   template<typename T>
-  std::vector<uint8_t> readBuffer(const T&);
+  std::vector<uint8_t> readBuffer(const T& t);
+
+  /**
+   * Populates the vector using the provided type name.
+   * @param buf output buffer
+   * @param t incoming object
+   * @returns number of bytes read.
+   */
+  template<typename T>
+  int readBuffer(std::vector<uint8_t>& buf, const T& t);
   std::recursive_mutex file_lock_;
 
   int fd_;
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index cf64928..6c70037 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -528,11 +528,16 @@ int Socket::writeData(uint8_t *value, int size) {
 template<typename T>
 inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
   std::vector<uint8_t> buf;
-  buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  readBuffer(buf, t);
   return buf;
 }
 
+template<typename T>
+inline int Socket::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+  buf.resize(sizeof t);
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
 int Socket::write(uint64_t base_value, bool is_little_endian) {
   return Serializable::write(base_value, this, is_little_endian);
 }
@@ -546,7 +551,9 @@ int Socket::write(uint16_t base_value, bool is_little_endian) {
 }
 
 int Socket::read(uint64_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
@@ -559,7 +566,9 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
 }
 
 int Socket::read(uint32_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
@@ -570,7 +579,9 @@ int Socket::read(uint32_t &value, bool is_little_endian) {
 }
 
 int Socket::read(uint16_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if(ret <= 0)return ret;
 
   if (is_little_endian) {
     value = (buf[0] << 8) | buf[1];
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 707d282..f77f5a1 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -68,11 +68,16 @@ int DescriptorStream::writeData(uint8_t *value, int size) {
 template<typename T>
 inline std::vector<uint8_t> DescriptorStream::readBuffer(const T& t) {
   std::vector<uint8_t> buf;
-  buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  readBuffer(buf, t);
   return buf;
 }
 
+template<typename T>
+inline int DescriptorStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+  buf.resize(sizeof t);
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
 int DescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
   if (buflen < 0) {
     throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
@@ -120,7 +125,9 @@ int DescriptorStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int DescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
-  auto buf = readBuffer(base_value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, base_value);
+  if (ret <= 0) return ret;
   if (is_little_endian) {
     base_value = (buf[0] << 8) | buf[1];
   } else {
@@ -157,7 +164,9 @@ int DescriptorStream::read(uint8_t *value, int len) {
  * @return resulting read size
  **/
 int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if (ret <= 0) return ret;
   if (is_little_endian) {
     value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
   } else {
@@ -173,7 +182,9 @@ int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int DescriptorStream::read(uint64_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if (ret <= 0) return ret;
 
   if (is_little_endian) {
     value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index b0463e7..0fe80aa 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -121,11 +121,16 @@ int FileStream::writeData(uint8_t *value, int size) {
 template<typename T>
 inline std::vector<uint8_t> FileStream::readBuffer(const T& t) {
   std::vector<uint8_t> buf;
-  buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  readBuffer(buf, t);
   return buf;
 }
 
+template<typename T>
+inline int FileStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+  buf.resize(sizeof t);
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
 int FileStream::readData(std::vector<uint8_t> &buf, int buflen) {
   if (buflen < 0) {
     throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index de3bf41..3965d0a 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -77,11 +77,16 @@ int SecureDescriptorStream::writeData(uint8_t *value, int size) {
 template<typename T>
 inline std::vector<uint8_t> SecureDescriptorStream::readBuffer(const T& t) {
   std::vector<uint8_t> buf;
-  buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  readBuffer(buf, t);
   return buf;
 }
 
+template<typename T>
+inline int SecureDescriptorStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+  buf.resize(sizeof t);
+  return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
 int SecureDescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
   if (buflen < 0) {
     throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
@@ -141,7 +146,9 @@ int SecureDescriptorStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int SecureDescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
-  auto buf = readBuffer(base_value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, base_value);
+  if (ret <= 0) return ret;
   if (is_little_endian) {
     base_value = (buf[0] << 8) | buf[1];
   } else {
@@ -178,7 +185,9 @@ int SecureDescriptorStream::read(uint8_t *value, int len) {
  * @return resulting read size
  **/
 int SecureDescriptorStream::read(uint32_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if (ret <= 0) return ret;
   if (is_little_endian) {
     value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
   } else {
@@ -194,7 +203,9 @@ int SecureDescriptorStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int SecureDescriptorStream::read(uint64_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
+  std::vector<uint8_t> buf;
+  auto ret = readBuffer(buf, value);
+  if (ret <= 0) return ret;
 
   if (is_little_endian) {
     value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)