You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/09/09 13:47:30 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #864: MINIFICPP-1319 - Stream refactor

szaszm commented on a change in pull request #864:
URL: https://github.com/apache/nifi-minifi-cpp/pull/864#discussion_r470076811



##########
File path: libminifi/include/io/OutputStream.h
##########
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdexcept>
+#include <vector>
+#include <string>
+#include "Stream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Serializable instances provide base functionality to
+ * write certain objects/primitives to a data stream.
+ *
+ */
+class OutputStream : public virtual Stream {
+ public:
+  /**
+   * write valueto stream
+   * @param value non encoded value
+   * @param len length of value
+   * @return resulting write size
+   **/
+  virtual int write(const uint8_t *value, int len) = 0;
+
+  int write(const std::vector<uint8_t>& buffer, int len);
+
+  /**
+   * write bool to stream
+   * @param value non encoded value
+   * @return resulting write size
+   **/
+  int write(bool value);
+
+  /**
+   * write string to stream
+   * @param str string to write
+   * @return resulting write size
+   **/
+  int write(const std::string& str, bool widen = false);
+
+  /**
+   * write string to stream
+   * @param str string to write
+   * @return resulting write size
+   **/
+  int write(const char* str, bool widen = false);
+
+  /**
+  * writes sizeof(Integral) bytes to the stream
+  * @param value to write
+  * @return resulting write size
+  **/
+  template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
+  int write(Integral value) {
+    uint8_t buffer[sizeof(Integral)]{};
+
+    for (std::size_t byteIdx = 0; byteIdx < sizeof(Integral); ++byteIdx) {
+      buffer[byteIdx] = static_cast<uint8_t>(value >> (8*(sizeof(Integral) - 1) - 8*byteIdx));

Review comment:
       This should be a `gsl::narrow_cast` instead of `static_cast`. It's just an alias for `static_cast` to be used to annotate narrowing conversions without checking.

##########
File path: libminifi/test/unit/FileStreamTests.cpp
##########
@@ -115,21 +115,21 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
   REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
 
   stream.seek(4);
 
-  stream.write(nullptr, 0);
+  stream.write((const uint8_t*)nullptr, 0);

Review comment:
       Please use a named cast or constructor call to avoid linter issues.

##########
File path: libminifi/test/unit/FileStreamTests.cpp
##########
@@ -153,21 +153,21 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
   REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
 
   stream.seek(4);
 
-  stream.write(nullptr, 0);
+  stream.write((const uint8_t*)nullptr, 0);

Review comment:
       Please use a named cast or constructor call to avoid linter issues.

##########
File path: libminifi/include/io/CRCStream.h
##########
@@ -34,307 +35,111 @@
 #endif
 #include "BaseStream.h"
 #include "Exception.h"
-#include "Serializable.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
+namespace internal {
 
-template<typename T>
-class CRCStream : public BaseStream {
+template<typename StreamType>
+class CRCStreamBase : public virtual Stream {
  public:
-  /**
-   * Raw pointer because the caller guarantees that
-   * it will exceed our lifetime.
-   */
-  explicit CRCStream(T *child_stream);
-  CRCStream(T *child_stream, uint64_t initial_crc);
-
-  CRCStream(CRCStream<T>&&) noexcept;
-
-  ~CRCStream() override = default;
-
-  T *getstream() const {
+  StreamType *getstream() const {
     return child_stream_;
   }
 
-  void disableEncoding() {
-    disable_encoding_ = true;
-  }
-
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(std::vector<uint8_t> &buf, int buflen) override;
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(uint8_t *buf, int buflen) override;
-
-  /**
-   * Write value to the stream using std::vector
-   * @param buf incoming buffer
-   * @param buflen buffer to write
-   */
-  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
-
-  /**
-   * writes value to stream
-   * @param value value to write
-   * @param size size of value
-   */
-  int writeData(uint8_t *value, int size) override;
-
-  using BaseStream::write;
-
-  /**
-   * write 4 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint32_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-  /**
-   * write 2 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint16_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * write 8 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint64_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
+  void close() override { child_stream_->close(); }
 
-  /**
-   * Reads a system word
-   * @param value value to write
-   */
-  int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a uint32_t
-   * @param value value to write
-   */
-  int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a system short
-   * @param value value to write
-   */
-  int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  const size_t getSize() const override { return child_stream_->getSize(); }
-
-  void closeStream() override { child_stream_->closeStream(); }
-
-  short initialize() override { // NOLINT
+  int initialize() override {
     child_stream_->initialize();
     reset();
     return 0;
   }
 
-  void updateCRC(uint8_t *buffer, uint32_t length);
+  void updateCRC(uint8_t *buffer, uint32_t length) {
+    crc_ = crc32(crc_, buffer, length);
+  }
 
   uint64_t getCRC() {
-    return crc_;
+    return gsl::narrow<uint64_t>(crc_);
   }
 
-  void reset();
-
- protected:
-  /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
-   * @param t incoming object
-   * @returns vector.
-   */
-  template<typename K>
-  std::vector<uint8_t> readBuffer(const K& t) {
-    std::vector<uint8_t> buf;
-    readBuffer(buf, t);
-    return buf;
+  void reset() {
+    crc_ = crc32(0L, Z_NULL, 0);
   }
 
-  /**
-   * Populates the vector using the provided type name.
-   * @param buf output buffer
-   * @param t incoming object
-   * @returns number of bytes read.
-   */
-  template<typename K>
-  int readBuffer(std::vector<uint8_t>& buf, const K& t) {
-    buf.resize(sizeof t);
-    return readData(reinterpret_cast<uint8_t*>(buf.data()), sizeof(t));
+ protected:
+  explicit CRCStreamBase(gsl::not_null<StreamType*> child_stream) : child_stream_(child_stream) {}
+  //  this implementation is here to make MSVC happy, declaration would be enough
+  //  as it will never get called (and should not be called)
+  CRCStreamBase() : child_stream_(gsl::make_not_null<StreamType*>(nullptr)) {
+    assert(false);
   }
 
-  uLong crc_;
-  T *child_stream_;
-  bool disable_encoding_;
+  uLong crc_ = 0;
+  gsl::not_null<StreamType*> child_stream_;
 };
 
-template<typename T>
-CRCStream<T>::CRCStream(T *child_stream)
-    : child_stream_(child_stream),
-      disable_encoding_(false) {
-  crc_ = crc32(0L, Z_NULL, 0);
-}
-
-template<typename T>
-CRCStream<T>::CRCStream(T *child_stream, uint64_t initial_crc)
-    : crc_(gsl::narrow<uLong>(initial_crc)),
-      child_stream_(child_stream),
-      disable_encoding_(false) {
-}
-
-template<typename T>
-CRCStream<T>::CRCStream(CRCStream<T> &&move) noexcept
-    : crc_(std::move(move.crc_)),
-      child_stream_(std::move(move.child_stream_)),
-      disable_encoding_(false) {
-}
-
-template<typename T>
-int CRCStream<T>::readData(std::vector<uint8_t> &buf, int buflen) {
-  if (buflen < 0) {
-    throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
-  }
-
-  if (buf.size() < static_cast<size_t>(buflen))
-    buf.resize(buflen);
-  return readData(buf.data(), buflen);
-}
-
-template<typename T>
-int CRCStream<T>::readData(uint8_t *buf, int buflen) {
-  int ret = child_stream_->read(buf, buflen);
-  if (ret > 0) {
-    crc_ = crc32(crc_, buf, ret);
-  }
-  return ret;
-}
-
-template<typename T>
-int CRCStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) {
-  if (buflen < 0) {
-    throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
+template<typename StreamType>
+class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStream {
+ public:
+  using InputStream::read;
+  using CRCStreamBase<StreamType>::child_stream_;
+  using CRCStreamBase<StreamType>::crc_;

Review comment:
       Why `public`?

##########
File path: libminifi/test/unit/SiteToSiteHelper.h
##########
@@ -19,132 +19,46 @@
 #define LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_
 
 #include <queue>
-#include "io/BaseStream.h"
+#include "io/BufferStream.h"
 #include "io/EndianCheck.h"
 #include "core/Core.h"
 /**
  * Test repository
  */
 class SiteToSiteResponder : public minifi::io::BaseStream {
  private:
-  std::queue<std::string> server_responses_;
+  minifi::io::BufferStream server_responses_;
   std::queue<std::string> client_responses_;
  public:
   SiteToSiteResponder() = default;
   // initialize
-  virtual short initialize() {
+  int initialize() override {
     return 1;
   }
 
-  void push_response(std::string resp) {
-    server_responses_.push(resp);
+  void push_response(const std::string& resp) {
+    server_responses_.write((const uint8_t*)resp.data(), resp.length());

Review comment:
       Avoid C-style casts. https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Res-casts-named https://google.github.io/styleguide/cppguide.html#Casting

##########
File path: libminifi/src/io/InputStream.cpp
##########
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdio>
+#include <iostream>
+#include <vector>
+#include <string>
+#include <algorithm>
+#include "io/InputStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+int InputStream::read(std::vector<uint8_t>& buffer, int len) {
+  if (buffer.size() < len) {
+    buffer.resize(len);
+  }
+  int ret = read(buffer.data(), len);
+  buffer.resize((std::max)(ret, 0));
+  return ret;
+}
+
+int InputStream::read(bool &value) {
+  uint8_t buf = 0;
+
+  if (read(&buf, 1) != 1) {
+    return -1;
+  }
+  value = buf;
+  return 1;
+}
+
+int InputStream::read(std::string &str, bool widen) {

Review comment:
       I know it's not new but I don't understand the purpose of `!widen`. If there is none, can we get rid of it?

##########
File path: libminifi/include/io/CRCStream.h
##########
@@ -34,307 +35,111 @@
 #endif
 #include "BaseStream.h"
 #include "Exception.h"
-#include "Serializable.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
+namespace internal {
 
-template<typename T>
-class CRCStream : public BaseStream {
+template<typename StreamType>
+class CRCStreamBase : public virtual Stream {
  public:
-  /**
-   * Raw pointer because the caller guarantees that
-   * it will exceed our lifetime.
-   */
-  explicit CRCStream(T *child_stream);
-  CRCStream(T *child_stream, uint64_t initial_crc);
-
-  CRCStream(CRCStream<T>&&) noexcept;
-
-  ~CRCStream() override = default;
-
-  T *getstream() const {
+  StreamType *getstream() const {
     return child_stream_;
   }
 
-  void disableEncoding() {
-    disable_encoding_ = true;
-  }
-
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(std::vector<uint8_t> &buf, int buflen) override;
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(uint8_t *buf, int buflen) override;
-
-  /**
-   * Write value to the stream using std::vector
-   * @param buf incoming buffer
-   * @param buflen buffer to write
-   */
-  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
-
-  /**
-   * writes value to stream
-   * @param value value to write
-   * @param size size of value
-   */
-  int writeData(uint8_t *value, int size) override;
-
-  using BaseStream::write;
-
-  /**
-   * write 4 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint32_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-  /**
-   * write 2 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint16_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * write 8 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint64_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
+  void close() override { child_stream_->close(); }
 
-  /**
-   * Reads a system word
-   * @param value value to write
-   */
-  int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a uint32_t
-   * @param value value to write
-   */
-  int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a system short
-   * @param value value to write
-   */
-  int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  const size_t getSize() const override { return child_stream_->getSize(); }
-
-  void closeStream() override { child_stream_->closeStream(); }
-
-  short initialize() override { // NOLINT
+  int initialize() override {
     child_stream_->initialize();
     reset();
     return 0;
   }
 
-  void updateCRC(uint8_t *buffer, uint32_t length);
+  void updateCRC(uint8_t *buffer, uint32_t length) {
+    crc_ = crc32(crc_, buffer, length);
+  }
 
   uint64_t getCRC() {
-    return crc_;
+    return gsl::narrow<uint64_t>(crc_);
   }
 
-  void reset();
-
- protected:
-  /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
-   * @param t incoming object
-   * @returns vector.
-   */
-  template<typename K>
-  std::vector<uint8_t> readBuffer(const K& t) {
-    std::vector<uint8_t> buf;
-    readBuffer(buf, t);
-    return buf;
+  void reset() {
+    crc_ = crc32(0L, Z_NULL, 0);
   }
 
-  /**
-   * Populates the vector using the provided type name.
-   * @param buf output buffer
-   * @param t incoming object
-   * @returns number of bytes read.
-   */
-  template<typename K>
-  int readBuffer(std::vector<uint8_t>& buf, const K& t) {
-    buf.resize(sizeof t);
-    return readData(reinterpret_cast<uint8_t*>(buf.data()), sizeof(t));
+ protected:
+  explicit CRCStreamBase(gsl::not_null<StreamType*> child_stream) : child_stream_(child_stream) {}
+  //  this implementation is here to make MSVC happy, declaration would be enough
+  //  as it will never get called (and should not be called)
+  CRCStreamBase() : child_stream_(gsl::make_not_null<StreamType*>(nullptr)) {
+    assert(false);

Review comment:
       Why was MSVC unhappy?

##########
File path: libminifi/src/io/BufferStream.cpp
##########
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <algorithm>
+#include "io/BufferStream.h"
+#include "utils/StreamUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+int BufferStream::write(const uint8_t *value, int size) {
+  utils::internal::ensureNonNegativeWrite(size);

Review comment:
       Just an idea:
   ```suggestion
     gsl_Expects(size > 0);
   ```
   or create a `PositiveInt` type with an invariant that its value is greater than zero, checked in the constructor, without implicitly underflowing like unsigned int or `size_t`.

##########
File path: libminifi/include/sitetosite/Peer.h
##########
@@ -345,7 +310,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
    * Move assignment operator.
    */
   SiteToSitePeer& operator=(SiteToSitePeer&& other) {
-    stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(other.stream_.release());
+    stream_ = std::unique_ptr<org::apache::nifi::minifi::io::BaseStream>(other.stream_.release());

Review comment:
       ```suggestion
       stream_ = utils::exchange(other.stream_, nullptr);
   ```

##########
File path: libminifi/src/sitetosite/SiteToSiteClient.cpp
##########
@@ -499,8 +499,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co
       return -1;
     }
 
-    ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())),
-                                             gsl::narrow<int>(len));
+    ret = transaction->getStream().write(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);

Review comment:
       This conversion is still narrowing so I don't see the reason for removing `gsl::narrow`.
   ```suggestion
       ret = transaction->getStream().write(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), gsl::narrow<int>(len));
   ```

##########
File path: libminifi/test/unit/Site2SiteTests.cpp
##########
@@ -94,7 +94,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
   sunny_path_bootstrap(collector);
 
   std::unique_ptr<minifi::sitetosite::SiteToSitePeer> peer = std::unique_ptr<minifi::sitetosite::SiteToSitePeer>(
-      new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<minifi::io::DataStream>(new org::apache::nifi::minifi::io::BaseStream(collector)), "fake_host", 65433, ""));
+      new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<minifi::io::BaseStream>(collector), "fake_host", 65433, ""));

Review comment:
       This looks like a fixed memory leak in the test. :+1: 

##########
File path: libminifi/include/io/CRCStream.h
##########
@@ -34,307 +35,111 @@
 #endif
 #include "BaseStream.h"
 #include "Exception.h"
-#include "Serializable.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
+namespace internal {
 
-template<typename T>
-class CRCStream : public BaseStream {
+template<typename StreamType>
+class CRCStreamBase : public virtual Stream {
  public:
-  /**
-   * Raw pointer because the caller guarantees that
-   * it will exceed our lifetime.
-   */
-  explicit CRCStream(T *child_stream);
-  CRCStream(T *child_stream, uint64_t initial_crc);
-
-  CRCStream(CRCStream<T>&&) noexcept;
-
-  ~CRCStream() override = default;
-
-  T *getstream() const {
+  StreamType *getstream() const {
     return child_stream_;
   }
 
-  void disableEncoding() {
-    disable_encoding_ = true;
-  }
-
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(std::vector<uint8_t> &buf, int buflen) override;
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
-  int readData(uint8_t *buf, int buflen) override;
-
-  /**
-   * Write value to the stream using std::vector
-   * @param buf incoming buffer
-   * @param buflen buffer to write
-   */
-  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
-
-  /**
-   * writes value to stream
-   * @param value value to write
-   * @param size size of value
-   */
-  int writeData(uint8_t *value, int size) override;
-
-  using BaseStream::write;
-
-  /**
-   * write 4 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint32_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-  /**
-   * write 2 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint16_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * write 8 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint64_t base_value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
+  void close() override { child_stream_->close(); }
 
-  /**
-   * Reads a system word
-   * @param value value to write
-   */
-  int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a uint32_t
-   * @param value value to write
-   */
-  int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  /**
-   * Reads a system short
-   * @param value value to write
-   */
-  int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
-
-  const size_t getSize() const override { return child_stream_->getSize(); }
-
-  void closeStream() override { child_stream_->closeStream(); }
-
-  short initialize() override { // NOLINT
+  int initialize() override {
     child_stream_->initialize();
     reset();
     return 0;
   }
 
-  void updateCRC(uint8_t *buffer, uint32_t length);
+  void updateCRC(uint8_t *buffer, uint32_t length) {
+    crc_ = crc32(crc_, buffer, length);
+  }
 
   uint64_t getCRC() {
-    return crc_;
+    return gsl::narrow<uint64_t>(crc_);
   }
 
-  void reset();
-
- protected:
-  /**
-   * Creates a vector and returns the vector using the provided
-   * type name.
-   * @param t incoming object
-   * @returns vector.
-   */
-  template<typename K>
-  std::vector<uint8_t> readBuffer(const K& t) {
-    std::vector<uint8_t> buf;
-    readBuffer(buf, t);
-    return buf;
+  void reset() {
+    crc_ = crc32(0L, Z_NULL, 0);
   }
 
-  /**
-   * Populates the vector using the provided type name.
-   * @param buf output buffer
-   * @param t incoming object
-   * @returns number of bytes read.
-   */
-  template<typename K>
-  int readBuffer(std::vector<uint8_t>& buf, const K& t) {
-    buf.resize(sizeof t);
-    return readData(reinterpret_cast<uint8_t*>(buf.data()), sizeof(t));
+ protected:
+  explicit CRCStreamBase(gsl::not_null<StreamType*> child_stream) : child_stream_(child_stream) {}
+  //  this implementation is here to make MSVC happy, declaration would be enough
+  //  as it will never get called (and should not be called)
+  CRCStreamBase() : child_stream_(gsl::make_not_null<StreamType*>(nullptr)) {
+    assert(false);
   }
 
-  uLong crc_;
-  T *child_stream_;
-  bool disable_encoding_;
+  uLong crc_ = 0;
+  gsl::not_null<StreamType*> child_stream_;
 };
 
-template<typename T>
-CRCStream<T>::CRCStream(T *child_stream)
-    : child_stream_(child_stream),
-      disable_encoding_(false) {
-  crc_ = crc32(0L, Z_NULL, 0);
-}
-
-template<typename T>
-CRCStream<T>::CRCStream(T *child_stream, uint64_t initial_crc)
-    : crc_(gsl::narrow<uLong>(initial_crc)),
-      child_stream_(child_stream),
-      disable_encoding_(false) {
-}
-
-template<typename T>
-CRCStream<T>::CRCStream(CRCStream<T> &&move) noexcept
-    : crc_(std::move(move.crc_)),
-      child_stream_(std::move(move.child_stream_)),
-      disable_encoding_(false) {
-}
-
-template<typename T>
-int CRCStream<T>::readData(std::vector<uint8_t> &buf, int buflen) {
-  if (buflen < 0) {
-    throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
-  }
-
-  if (buf.size() < static_cast<size_t>(buflen))
-    buf.resize(buflen);
-  return readData(buf.data(), buflen);
-}
-
-template<typename T>
-int CRCStream<T>::readData(uint8_t *buf, int buflen) {
-  int ret = child_stream_->read(buf, buflen);
-  if (ret > 0) {
-    crc_ = crc32(crc_, buf, ret);
-  }
-  return ret;
-}
-
-template<typename T>
-int CRCStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) {
-  if (buflen < 0) {
-    throw minifi::Exception{ExceptionType::GENERAL_EXCEPTION, "negative buflen"};
+template<typename StreamType>
+class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStream {
+ public:
+  using InputStream::read;
+  using CRCStreamBase<StreamType>::child_stream_;
+  using CRCStreamBase<StreamType>::crc_;
+
+  int read(uint8_t *buf, int buflen) override {
+    int ret = child_stream_->read(buf, buflen);
+    if (ret > 0) {
+      crc_ = crc32(crc_, buf, ret);
+    }
+    return ret;
   }
 
-  if (buf.size() < static_cast<size_t>(buflen))
-    buf.resize(buflen);
-  return writeData(buf.data(), buflen);
-}
+  size_t size() const override { return child_stream_->size(); }
+};
 
-template<typename T>
-int CRCStream<T>::writeData(uint8_t *value, int size) {
-  int ret = child_stream_->write(value, size);
-  if (ret > 0) {
-    crc_ = crc32(crc_, value, ret);
+template<typename StreamType>
+class OutputCRCStream : public virtual CRCStreamBase<StreamType>, public OutputStream {
+ public:
+  using OutputStream::write;
+  using CRCStreamBase<StreamType>::child_stream_;
+  using CRCStreamBase<StreamType>::crc_;
+
+  int write(const uint8_t *value, int size) override {
+    int ret = child_stream_->write(value, size);
+    if (ret > 0) {
+      crc_ = crc32(crc_, value, ret);
+    }
+    return ret;
   }
-  return ret;
-}
-template<typename T>
-void CRCStream<T>::reset() {
-  crc_ = crc32(0L, Z_NULL, 0);
-}
-template<typename T>
-void CRCStream<T>::updateCRC(uint8_t *buffer, uint32_t length) {
-  crc_ = crc32(crc_, buffer, length);
-}
-
-template<typename T>
-int CRCStream<T>::write(uint64_t base_value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  const uint64_t value = is_little_endian == 1 ? byteSwap(base_value) : base_value;
-  uint8_t bytes[sizeof value];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes);
-  return writeData(bytes, sizeof value);
-}
+};
 
-template<typename T>
-int CRCStream<T>::write(uint32_t base_value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  const uint32_t value = is_little_endian ? byteSwap(base_value) : base_value;
-  uint8_t bytes[sizeof value];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes);
-  return writeData(bytes, sizeof value);
-}
+struct empty_class {};
 
-template<typename T>
-int CRCStream<T>::write(uint16_t base_value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  const uint16_t value = is_little_endian == 1 ? byteSwap(base_value) : base_value;
-  uint8_t bytes[sizeof value];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes);
-  return writeData(bytes, sizeof value);
-}
+}  // namespace internal
 
-template<typename T>
-int CRCStream<T>::read(uint64_t &value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  std::vector<uint8_t> buf;
-  auto ret = readBuffer(buf, value);
-  if (ret <= 0) return ret;
+template<typename StreamType>
+class CRCStream : public std::conditional<std::is_base_of<InputStream, StreamType>::value, internal::InputCRCStream<StreamType>, internal::empty_class>::type
+                  , public std::conditional<std::is_base_of<OutputStream, StreamType>::value, internal::OutputCRCStream<StreamType>, internal::empty_class>::type {
+  using internal::CRCStreamBase<StreamType>::child_stream_;
+  using internal::CRCStreamBase<StreamType>::crc_;
 
-  if (is_little_endian) {
-    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
-        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
-  } else {
-    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
-        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+ public:
+  explicit CRCStream(gsl::not_null<StreamType*> child_stream) : internal::CRCStreamBase<StreamType>(child_stream) {
+    crc_ = crc32(0L, Z_NULL, 0);
   }
-  return sizeof(value);
-}
-
-template<typename T>
-int CRCStream<T>::read(uint32_t &value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  std::vector<uint8_t> buf;
-  auto ret = readBuffer(buf, value);
-  if (ret <= 0) return ret;
 
-  if (is_little_endian) {
-    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
-  } else {
-    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  CRCStream(gsl::not_null<StreamType*> child_stream, uint64_t initial_crc) : internal::CRCStreamBase<StreamType>(child_stream) {
+    crc_ = gsl::narrow<uLong>(initial_crc);
   }
 
-  return sizeof(value);
-}
-
-template<typename T>
-int CRCStream<T>::read(uint16_t &value, bool is_little_endian) {
-  if (disable_encoding_)
-    is_little_endian = false;
-  std::vector<uint8_t> buf;
-  auto ret = readBuffer(buf, value);
-  if (ret <= 0) return ret;
-
-  if (is_little_endian) {
-    value = (buf[0] << 8) | buf[1];
-  } else {
-    value = buf[0] | buf[1] << 8;
+  CRCStream(CRCStream &&stream) noexcept : internal::CRCStreamBase<StreamType>(std::move(stream.child_stream_)){
+    crc_ = std::move(stream.crc_);

Review comment:
       Shouldn't we reset `stream`? This would also mean that `child_stream_` must be nullable.
   ```suggestion
     CRCStream(CRCStream &&stream) noexcept : internal::CRCStreamBase<StreamType>(utils::exchange(stream.child_stream_, nullptr)){
       crc_ = utils::exchange(stream.crc_, 0);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org