You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/19 16:18:46 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1225 - Fix
readBuffer not reporting errors, handle asserts during CivetServer shutdown
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b7fdeb4 MINIFICPP-1225 - Fix readBuffer not reporting errors, handle asserts during CivetServer shutdown
b7fdeb4 is described below
commit b7fdeb450d42aa85c69529c52fc453feba5a30bf
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Fri May 15 16:11:52 2020 +0200
MINIFICPP-1225 - Fix readBuffer not reporting errors, handle asserts during CivetServer shutdown
MINIFICPP-1225 - Possible fix for ListenHTTP processor, formatting fixes
MINIFICPP-1225 - Add missing override specifiers.
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #786
---
extensions/civetweb/processors/ListenHTTP.cpp | 5 +++
extensions/civetweb/processors/ListenHTTP.h | 8 ++--
extensions/http-curl/client/HTTPStream.cpp | 6 +--
extensions/http-curl/client/HTTPStream.h | 8 ++--
extensions/http-curl/tests/CivetStream.h | 12 +++---
extensions/http-curl/tests/HTTPHandlers.h | 46 ++++++++++++++--------
extensions/http-curl/tests/HTTPIntegrationBase.h | 10 +++--
extensions/http-curl/tests/ServerAwareHandler.h | 36 +++++++++++++++++
extensions/http-curl/tests/SiteToSiteRestTest.cpp | 2 +-
.../http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp | 4 +-
.../http-curl/tests/VerifyInvokeHTTPTest.cpp | 2 +-
extensions/rocksdb-repos/RocksDbStream.cpp | 6 +--
extensions/rocksdb-repos/RocksDbStream.h | 8 ++--
libminifi/include/io/CRCStream.h | 27 ++++++++++---
libminifi/include/io/ClientSocket.h | 11 +++++-
libminifi/include/io/DescriptorStream.h | 11 +++++-
libminifi/include/io/FileStream.h | 11 +++++-
libminifi/include/io/tls/SecureDescriptorStream.h | 11 +++++-
libminifi/src/io/ClientSocket.cpp | 21 +++++++---
libminifi/src/io/DescriptorStream.cpp | 21 +++++++---
libminifi/src/io/FileStream.cpp | 9 ++++-
libminifi/src/io/tls/SecureDescriptorStream.cpp | 21 +++++++---
22 files changed, 220 insertions(+), 76 deletions(-)
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 193b7d4..b9ab508 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -487,6 +487,11 @@ std::string ListenHTTP::getPort() const {
return listeningPort;
}
+void ListenHTTP::notifyStop() {
+ server_.reset();
+ handler_.reset();
+}
+
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index e949fad..847a536 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -69,9 +69,9 @@ class ListenHTTP : public core::Processor {
// Supported Relationships
static core::Relationship Success;
- void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- void initialize();
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+ void initialize() override;
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
std::string getPort() const;
bool isSecure() const;
@@ -201,6 +201,8 @@ class ListenHTTP : public core::Processor {
}
return 0;
}
+ protected:
+ void notifyStop() override;
private:
// Logger
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index cbcd3d8..e3cc31f 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -87,11 +87,9 @@ int HttpStream::writeData(uint8_t *value, int size) {
}
template<typename T>
-inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) {
- std::vector<uint8_t> buf;
+inline int HttpStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
- return buf;
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
}
int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) {
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index 7e5a6c2..ce6796b 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -155,13 +155,13 @@ class HttpStream : public io::BaseStream {
protected:
/**
- * Creates a vector and returns the vector using the provided
- * type name.
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
* @param t incoming object
- * @returns vector.
+ * @returns number of bytes read.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ int readBuffer(std::vector<uint8_t>&, const T&);
void reset();
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 2e58fd9..4503ec3 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -108,17 +108,15 @@ class CivetStream : public io::BaseStream {
protected:
/**
- * Creates a vector and returns the vector using the provided
- * type name.
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
* @param t incoming object
- * @returns vector.
+ * @returns number of bytes read
*/
template<typename T>
- inline std::vector<uint8_t> readBuffer(const T& t) {
- std::vector<uint8_t> buf;
+ inline int readBuffer(std::vector<uint8_t>& buf, const T& t) {
buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
- return buf;
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
}
//size_t pos;
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 0a36387..eda7a0e 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -27,6 +27,7 @@
#include <cinttypes>
#include <utility>
#include "HTTPUtils.h"
+#include "ServerAwareHandler.h"
static std::atomic<int> transaction_id;
static std::atomic<int> transaction_id_output;
@@ -45,7 +46,7 @@ struct FlowObj {
std::vector<uint8_t> data;
};
-class SiteToSiteLocationResponder : public CivetHandler {
+class SiteToSiteLocationResponder : public ServerAwareHandler {
public:
explicit SiteToSiteLocationResponder(bool isSecure)
: isSecure(isSecure) {
@@ -72,7 +73,7 @@ class SiteToSiteLocationResponder : public CivetHandler {
bool isSecure;
};
-class PeerResponder : public CivetHandler {
+class PeerResponder : public ServerAwareHandler {
public:
explicit PeerResponder(std::string base_url) {
@@ -101,7 +102,7 @@ class PeerResponder : public CivetHandler {
std::string path;
};
-class SiteToSiteBaseResponder : public CivetHandler {
+class SiteToSiteBaseResponder : public ServerAwareHandler {
public:
explicit SiteToSiteBaseResponder(std::string base_url)
@@ -122,7 +123,7 @@ class SiteToSiteBaseResponder : public CivetHandler {
std::string base_url;
};
-class TransactionResponder : public CivetHandler {
+class TransactionResponder : public ServerAwareHandler {
public:
explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri = false, bool empty_transaction_uri = false)
@@ -184,7 +185,7 @@ class TransactionResponder : public CivetHandler {
moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
};
-class FlowFileResponder : public CivetHandler {
+class FlowFileResponder : public ServerAwareHandler {
public:
explicit FlowFileResponder(bool input_port, bool wrong_uri = false, bool invalid_checksum = false)
@@ -210,25 +211,36 @@ class FlowFileResponder : public CivetHandler {
minifi::io::CivetStream civet_stream(conn);
minifi::io::CRCStream < minifi::io::CivetStream > stream(&civet_stream);
uint32_t num_attributes;
+ int read;
uint64_t total_size = 0;
- total_size += stream.read(num_attributes);
+ read = stream.read(num_attributes);
+ if(!isServerRunning())return false;
+ assert(read > 0); total_size += read;
auto flow = std::make_shared<FlowObj>();
for (int i = 0; i < num_attributes; i++) {
std::string name, value;
- total_size += stream.readUTF(name, true);
- total_size += stream.readUTF(value, true);
+ read = stream.readUTF(name, true);
+ if(!isServerRunning())return false;
+ assert(read > 0); total_size += read;
+ read = stream.readUTF(value, true);
+ if(!isServerRunning())return false;
+ assert(read > 0); total_size += read;
flow->attributes[name] = value;
}
uint64_t length;
- total_size += stream.read(length);
+ read = stream.read(length);
+ if(!isServerRunning())return false;
+ assert(read > 0); total_size += read;
total_size += length;
flow->data.resize(length);
flow->total_size = total_size;
- assert(stream.readData(flow->data.data(), length) == length);
+ read = stream.readData(flow->data.data(), length);
+ if(!isServerRunning())return false;
+ assert(read == length);
assert(flow->attributes["path"] == ".");
assert(!flow->attributes["uuid"].empty());
@@ -309,7 +321,7 @@ class FlowFileResponder : public CivetHandler {
moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
};
-class DeleteTransactionResponder : public CivetHandler {
+class DeleteTransactionResponder : public ServerAwareHandler {
public:
explicit DeleteTransactionResponder(std::string base_url, std::string response_code, int expected_resp_code)
@@ -348,7 +360,7 @@ class DeleteTransactionResponder : public CivetHandler {
std::string response_code;
};
-class HeartbeatHandler : public CivetHandler {
+class HeartbeatHandler : public ServerAwareHandler {
public:
explicit HeartbeatHandler(bool isSecure)
: isSecure(isSecure) {
@@ -448,10 +460,10 @@ class HeartbeatHandler : public CivetHandler {
bool isSecure;
};
-class InvokeHTTPCouldNotConnectHandler : public CivetHandler {
+class InvokeHTTPCouldNotConnectHandler : public ServerAwareHandler {
};
-class InvokeHTTPResponseOKHandler : public CivetHandler {
+class InvokeHTTPResponseOKHandler : public ServerAwareHandler {
public:
bool handlePost(CivetServer *, struct mg_connection *conn) {
mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -459,7 +471,7 @@ public:
}
};
-class InvokeHTTPResponse404Handler : public CivetHandler {
+class InvokeHTTPResponse404Handler : public ServerAwareHandler {
public:
bool handlePost(CivetServer *, struct mg_connection *conn) {
mg_printf(conn, "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -467,7 +479,7 @@ public:
}
};
-class InvokeHTTPResponse501Handler : public CivetHandler {
+class InvokeHTTPResponse501Handler : public ServerAwareHandler {
public:
bool handlePost(CivetServer *, struct mg_connection *conn) {
mg_printf(conn, "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -475,7 +487,7 @@ public:
}
};
-class TimeoutingHTTPHandler : public CivetHandler {
+class TimeoutingHTTPHandler : public ServerAwareHandler {
public:
TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
: wait_(wait_ms) {
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 7c9960e..776c647 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -23,6 +23,7 @@
#include "integration/IntegrationBase.h"
#include "c2/C2Agent.h"
#include "protocols/RESTSender.h"
+#include "ServerAwareHandler.h"
int log_message(const struct mg_connection *conn, const char *message) {
puts(message);
@@ -40,9 +41,10 @@ class CoapIntegrationBase : public IntegrationBase {
server(nullptr) {
}
- void setUrl(const std::string& url, CivetHandler *handler);
+ void setUrl(const std::string& url, ServerAwareHandler *handler);
void shutdownBeforeFlowController() override {
+ is_server_running = false;
stop_webserver(server);
}
@@ -55,17 +57,19 @@ class CoapIntegrationBase : public IntegrationBase {
}
protected:
+ std::atomic_bool is_server_running;
CivetServer *server;
};
-void CoapIntegrationBase::setUrl(const std::string& url, CivetHandler *handler) {
-
+void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *handler) {
+ handler->initServerFlag(is_server_running);
parse_http_components(url, port, scheme, path);
struct mg_callbacks callback{};
if (server != nullptr) {
server->addHandler(path, handler);
return;
}
+ is_server_running = true;
if (scheme == "https" && !key_dir.empty()) {
std::string cert = "";
cert = key_dir + "nifi-cert.pem";
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h b/extensions/http-curl/tests/ServerAwareHandler.h
new file mode 100644
index 0000000..63ee0ee
--- /dev/null
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
+#define NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
+
+class ServerAwareHandler: public CivetHandler{
+protected:
+ const std::atomic_bool *is_server_running{nullptr};
+ bool isServerRunning(){
+ assert(is_server_running);
+ return *is_server_running;
+ }
+public:
+ void initServerFlag(std::atomic_bool& is_running){
+ assert(is_server_running == nullptr);
+ is_server_running = &is_running;
+ }
+};
+
+#endif //NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 036b949..1d6deaf 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -49,7 +49,7 @@
#include "../tests/TestServer.h"
#include "HTTPIntegrationBase.h"
-class Responder : public CivetHandler {
+class Responder : public ServerAwareHandler {
public:
explicit Responder(bool isSecure)
: isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index 41619a8..91a9a77 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -91,8 +91,8 @@ protected:
};
struct defaulted_handler{
- CivetHandler* handler = nullptr;
- CivetHandler* get(CivetHandler *def) const {
+ ServerAwareHandler* handler = nullptr;
+ ServerAwareHandler* get(ServerAwareHandler *def) const {
if(handler)return handler;
return def;
}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index bbc1ce2..7d31165 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -141,7 +141,7 @@ void run(VerifyInvokeHTTP& harness,
const std::string& url,
const std::string& test_file_location,
const std::string& key_dir,
- CivetHandler * handler) {
+ ServerAwareHandler * handler) {
harness.setKeyDir(key_dir);
harness.setUrl(url, handler);
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 2a2cd95..7bcc2e1 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -80,11 +80,9 @@ int RocksDbStream::writeData(uint8_t *value, int size) {
}
template<typename T>
-inline std::vector<uint8_t> RocksDbStream::readBuffer(const T& t) {
- std::vector<uint8_t> buf;
+inline int RocksDbStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
- return buf;
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
}
int RocksDbStream::readData(std::vector<uint8_t> &buf, int buflen) {
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 924ee0c..6bec50f 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -142,13 +142,13 @@ class RocksDbStream : public io::BaseStream {
protected:
/**
- * Creates a vector and returns the vector using the provided
- * type name.
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
* @param t incoming object
- * @returns vector.
+ * @returns number of bytes read.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ int readBuffer(std::vector<uint8_t>&, const T&);
std::string path_;
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 6b63f95..6ab131f 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -153,11 +153,22 @@ class CRCStream : public BaseStream {
template<typename K>
std::vector<uint8_t> readBuffer(const K& t) {
std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData((uint8_t*) &buf[0], sizeof(t));
+ readBuffer(buf, t);
return buf;
}
+ /**
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
+ * @param t incoming object
+ * @returns number of bytes read.
+ */
+ template<typename K>
+ int readBuffer(std::vector<uint8_t>& buf, const K& t) {
+ buf.resize(sizeof t);
+ return readData((uint8_t*) &buf[0], sizeof(t));
+ }
+
uint64_t crc_;
T *child_stream_;
bool disable_encoding_;
@@ -259,7 +270,9 @@ template<typename T>
int CRCStream<T>::read(uint64_t &value, bool is_little_endian) {
if (disable_encoding_)
is_little_endian = false;
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
@@ -275,7 +288,9 @@ template<typename T>
int CRCStream<T>::read(uint32_t &value, bool is_little_endian) {
if (disable_encoding_)
is_little_endian = false;
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
@@ -290,7 +305,9 @@ template<typename T>
int CRCStream<T>::read(uint16_t &value, bool is_little_endian) {
if (disable_encoding_)
is_little_endian = false;
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = (buf[0] << 8) | buf[1];
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index b1ca262..c209ee3 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -249,7 +249,16 @@ class Socket : public BaseStream {
* @returns vector.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ std::vector<uint8_t> readBuffer(const T& t);
+
+ /**
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
+ * @param t incoming object
+ * @returns number of bytes read.
+ */
+ template<typename T>
+ int readBuffer(std::vector<uint8_t>& buf, const T& t);
/**
* Creates a connection using the addr object
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index 8389c08..2b131a5 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -163,7 +163,16 @@ class DescriptorStream : public io::BaseStream {
* @returns vector.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ std::vector<uint8_t> readBuffer(const T& t);
+
+ /**
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
+ * @param t incoming object
+ * @returns number of bytes read.
+ */
+ template<typename T>
+ int readBuffer(std::vector<uint8_t>& buf, const T& t);
std::recursive_mutex file_lock_;
int fd_;
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 52675d6..8b99564 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -113,7 +113,16 @@ class FileStream : public io::BaseStream {
* @returns vector.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ std::vector<uint8_t> readBuffer(const T& t);
+
+ /**
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
+ * @param t incoming object
+ * @returns number of bytes read.
+ */
+ template<typename T>
+ int readBuffer(std::vector<uint8_t>& buf, const T& t);
std::recursive_mutex file_lock_;
std::unique_ptr<std::fstream> file_stream_;
size_t offset_;
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 2528323..cabf0eb 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -165,7 +165,16 @@ class SecureDescriptorStream : public io::BaseStream {
* @returns vector.
*/
template<typename T>
- std::vector<uint8_t> readBuffer(const T&);
+ std::vector<uint8_t> readBuffer(const T& t);
+
+ /**
+ * Populates the vector using the provided type name.
+ * @param buf output buffer
+ * @param t incoming object
+ * @returns number of bytes read.
+ */
+ template<typename T>
+ int readBuffer(std::vector<uint8_t>& buf, const T& t);
std::recursive_mutex file_lock_;
int fd_;
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index cf64928..6c70037 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -528,11 +528,16 @@ int Socket::writeData(uint8_t *value, int size) {
template<typename T>
inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ readBuffer(buf, t);
return buf;
}
+template<typename T>
+inline int Socket::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+ buf.resize(sizeof t);
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
int Socket::write(uint64_t base_value, bool is_little_endian) {
return Serializable::write(base_value, this, is_little_endian);
}
@@ -546,7 +551,9 @@ int Socket::write(uint16_t base_value, bool is_little_endian) {
}
int Socket::read(uint64_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
@@ -559,7 +566,9 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
}
int Socket::read(uint32_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
@@ -570,7 +579,9 @@ int Socket::read(uint32_t &value, bool is_little_endian) {
}
int Socket::read(uint16_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if(ret <= 0)return ret;
if (is_little_endian) {
value = (buf[0] << 8) | buf[1];
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 707d282..f77f5a1 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -68,11 +68,16 @@ int DescriptorStream::writeData(uint8_t *value, int size) {
template<typename T>
inline std::vector<uint8_t> DescriptorStream::readBuffer(const T& t) {
std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ readBuffer(buf, t);
return buf;
}
+template<typename T>
+inline int DescriptorStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+ buf.resize(sizeof t);
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
int DescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
if (buflen < 0) {
throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
@@ -120,7 +125,9 @@ int DescriptorStream::read(uint8_t &value) {
* @return resulting read size
**/
int DescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
- auto buf = readBuffer(base_value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, base_value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
base_value = (buf[0] << 8) | buf[1];
} else {
@@ -157,7 +164,9 @@ int DescriptorStream::read(uint8_t *value, int len) {
* @return resulting read size
**/
int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
} else {
@@ -173,7 +182,9 @@ int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int DescriptorStream::read(uint64_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index b0463e7..0fe80aa 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -121,11 +121,16 @@ int FileStream::writeData(uint8_t *value, int size) {
template<typename T>
inline std::vector<uint8_t> FileStream::readBuffer(const T& t) {
std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ readBuffer(buf, t);
return buf;
}
+template<typename T>
+inline int FileStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+ buf.resize(sizeof t);
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
int FileStream::readData(std::vector<uint8_t> &buf, int buflen) {
if (buflen < 0) {
throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index de3bf41..3965d0a 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -77,11 +77,16 @@ int SecureDescriptorStream::writeData(uint8_t *value, int size) {
template<typename T>
inline std::vector<uint8_t> SecureDescriptorStream::readBuffer(const T& t) {
std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ readBuffer(buf, t);
return buf;
}
+template<typename T>
+inline int SecureDescriptorStream::readBuffer(std::vector<uint8_t>& buf, const T& t) {
+ buf.resize(sizeof t);
+ return readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+}
+
int SecureDescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
if (buflen < 0) {
throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
@@ -141,7 +146,9 @@ int SecureDescriptorStream::read(uint8_t &value) {
* @return resulting read size
**/
int SecureDescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
- auto buf = readBuffer(base_value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, base_value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
base_value = (buf[0] << 8) | buf[1];
} else {
@@ -178,7 +185,9 @@ int SecureDescriptorStream::read(uint8_t *value, int len) {
* @return resulting read size
**/
int SecureDescriptorStream::read(uint32_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
} else {
@@ -194,7 +203,9 @@ int SecureDescriptorStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int SecureDescriptorStream::read(uint64_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ std::vector<uint8_t> buf;
+ auto ret = readBuffer(buf, value);
+ if (ret <= 0) return ret;
if (is_little_endian) {
value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)