You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/12 01:36:19 UTC

[incubator-tubemq] branch tubemq-client-cpp updated: TUBEMQ-287 add io buffer (#253)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new b94cc36  TUBEMQ-287 add io buffer (#253)
b94cc36 is described below

commit b94cc364951ea74dc41b34adfe7d0289d36b4d14
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Sat Sep 12 09:36:11 2020 +0800

    TUBEMQ-287 add io buffer (#253)
    
    Co-authored-by: charleli <ch...@tencent.com>
---
 tubemq-client-twins/tubemq-client-cpp/src/buffer.h | 356 +++++++++++++++++++++
 1 file changed, 356 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-cpp/src/buffer.h b/tubemq-client-twins/tubemq-client-cpp/src/buffer.h
new file mode 100644
index 0000000..0851375
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/buffer.h
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Modified from evpp
+// @see https://github.com/Qihoo360/evpp/blob/master/evpp/buffer.h
+
+#ifndef _TUBEMQ_BUFFER_H_
+#define _TUBEMQ_BUFFER_H_
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <algorithm>
+#include <memory>
+#include <string>
+
+namespace tubemq {
+
+class Buffer;
+using BufferPtr = std::shared_ptr<Buffer>;
+
+class Buffer {
+ public:
+  static const size_t kCheapPrependSize = 0;
+  static const size_t kInitialSize = 8192;
+
+  explicit Buffer(size_t initial_size = kInitialSize,
+                  size_t reserved_prepend_size = kCheapPrependSize)
+      : capacity_(reserved_prepend_size + initial_size),
+        read_index_(reserved_prepend_size),
+        write_index_(reserved_prepend_size),
+        reserved_prepend_size_(reserved_prepend_size) {
+    buffer_ = new char[capacity_];
+    has_mem_ = true;
+    assert(length() == 0);
+    assert(WritableBytes() == initial_size);
+    assert(PrependableBytes() == reserved_prepend_size);
+  }
+
+  ~Buffer() {
+    if (has_mem_) {
+      delete[] buffer_;
+    }
+    buffer_ = nullptr;
+    capacity_ = 0;
+  }
+
+  std::string String() {
+    char buf[1024];
+    snprintf(buf, sizeof(buf),
+             "buffer:%p,capacity:%ld,readindex:%ld,writeindex:%ld,prependsize:%ld,hasmem:%d",
+             buffer_, capacity_, read_index_, write_index_, reserved_prepend_size_, has_mem_);
+    return buf;
+  }
+
+  BufferPtr Slice() {
+    auto buff = std::make_shared<Buffer>(*this);
+    buff->has_mem_ = false;
+    return buff;
+  }
+
+  void Swap(Buffer& rhs) {
+    std::swap(buffer_, rhs.buffer_);
+    std::swap(capacity_, rhs.capacity_);
+    std::swap(read_index_, rhs.read_index_);
+    std::swap(write_index_, rhs.write_index_);
+    std::swap(reserved_prepend_size_, rhs.reserved_prepend_size_);
+  }
+
+  // Skip advances the reading index of the buffer
+  void Skip(size_t len) {
+    if (len < length()) {
+      read_index_ += len;
+    } else {
+      Reset();
+    }
+  }
+
+  // Retrieve advances the reading index of the buffer
+  // Retrieve it the same as Skip.
+  void Retrieve(size_t len) { Skip(len); }
+
+  // Truncate discards all but the first n unread bytes from the buffer
+  // but continues to use the same allocated storage.
+  // It does nothing if n is greater than the length of the buffer.
+  void Truncate(size_t n) {
+    if (n == 0) {
+      read_index_ = reserved_prepend_size_;
+      write_index_ = reserved_prepend_size_;
+    } else if (write_index_ > read_index_ + n) {
+      write_index_ = read_index_ + n;
+    }
+  }
+
+  // Reset resets the buffer to be empty,
+  // but it retains the underlying storage for use by future writes.
+  // Reset is the same as Truncate(0).
+  void Reset() { Truncate(0); }
+
+  // Increase the capacity of the container to a value that's greater
+  // or equal to len. If len is greater than the current capacity(),
+  // new storage is allocated, otherwise the method does nothing.
+  void Reserve(size_t len) {
+    if (capacity_ >= len + reserved_prepend_size_) {
+      return;
+    }
+
+    // TODO add the implementation logic here
+    grow(len + reserved_prepend_size_);
+  }
+
+  // Make sure there is enough memory space to append more data with length len
+  void EnsureWritableBytes(size_t len) {
+    if (WritableBytes() < len) {
+      grow(len);
+    }
+
+    assert(WritableBytes() >= len);
+  }
+
+  // ToText appends char '\0' to buffer to convert the underlying data to a c-style string text.
+  // It will not change the length of buffer.
+  void ToText() {
+    AppendInt8('\0');
+    UnwriteBytes(1);
+  }
+
+  // Write
+ public:
+  void Write(const void* /*restrict*/ d, size_t len) {
+    EnsureWritableBytes(len);
+    memcpy(WriteBegin(), d, len);
+    assert(write_index_ + len <= capacity_);
+    write_index_ += len;
+  }
+
+  void Append(const char* /*restrict*/ d, size_t len) { Write(d, len); }
+
+  void Append(const void* /*restrict*/ d, size_t len) { Write(d, len); }
+
+  void AppendInt32(int32_t x) {
+    int32_t be32 = htonl(x);
+    Write(&be32, sizeof be32);
+  }
+
+  void AppendInt16(int16_t x) {
+    int16_t be16 = htons(x);
+    Write(&be16, sizeof be16);
+  }
+
+  void AppendInt8(int8_t x) { Write(&x, sizeof x); }
+
+  void PrependInt32(int32_t x) {
+    int32_t be32 = htonl(x);
+    Prepend(&be32, sizeof be32);
+  }
+
+  void PrependInt16(int16_t x) {
+    int16_t be16 = htons(x);
+    Prepend(&be16, sizeof be16);
+  }
+
+  void PrependInt8(int8_t x) { Prepend(&x, sizeof x); }
+
+  // Insert content, specified by the parameter, into the front of reading index
+  void Prepend(const void* /*restrict*/ d, size_t len) {
+    assert(len <= PrependableBytes());
+    read_index_ -= len;
+    const char* p = static_cast<const char*>(d);
+    memcpy(begin() + read_index_, p, len);
+  }
+
+  void UnwriteBytes(size_t n) {
+    assert(n <= length());
+    write_index_ -= n;
+  }
+
+  void WriteBytes(size_t n) {
+    assert(n <= WritableBytes());
+    write_index_ += n;
+  }
+
+  // Read
+ public:
+  // Peek int32_t/int16_t/int8_t with network endian
+
+  uint32_t ReadUint32() {
+    uint32_t result = PeekUint32();
+    Skip(sizeof result);
+    return result;
+  }
+
+  int32_t ReadInt32() {
+    int32_t result = PeekInt32();
+    Skip(sizeof result);
+    return result;
+  }
+
+  int16_t ReadInt16() {
+    int16_t result = PeekInt16();
+    Skip(sizeof result);
+    return result;
+  }
+
+  int8_t ReadInt8() {
+    int8_t result = PeekInt8();
+    Skip(sizeof result);
+    return result;
+  }
+
+  std::string ToString() const { return std::string(data(), length()); }
+
+  // ReadByte reads and returns the next byte from the buffer.
+  // If no byte is available, it returns '\0'.
+  char ReadByte() {
+    assert(length() >= 1);
+
+    if (length() == 0) {
+      return '\0';
+    }
+
+    return buffer_[read_index_++];
+  }
+
+  // UnreadBytes unreads the last n bytes returned
+  // by the most recent read operation.
+  void UnreadBytes(size_t n) {
+    assert(n < read_index_);
+    read_index_ -= n;
+  }
+
+  // Peek
+ public:
+  // Peek int64_t/int32_t/int16_t/int8_t with network endian
+
+  uint32_t PeekUint32() const {
+    assert(length() >= sizeof(uint32_t));
+    uint32_t be32 = 0;
+    ::memcpy(&be32, data(), sizeof be32);
+    return ntohl(be32);
+  }
+
+  int32_t PeekInt32() const {
+    assert(length() >= sizeof(int32_t));
+    int32_t be32 = 0;
+    ::memcpy(&be32, data(), sizeof be32);
+    return ntohl(be32);
+  }
+
+  int16_t PeekInt16() const {
+    assert(length() >= sizeof(int16_t));
+    int16_t be16 = 0;
+    ::memcpy(&be16, data(), sizeof be16);
+    return ntohs(be16);
+  }
+
+  int8_t PeekInt8() const {
+    assert(length() >= sizeof(int8_t));
+    int8_t x = *data();
+    return x;
+  }
+
+ public:
+  // data returns a pointer of length Buffer.length() holding the unread portion of the buffer.
+  // The data is valid for use only until the next buffer modification (that is,
+  // only until the next call to a method like Read, Write, Reset, or Truncate).
+  // The data aliases the buffer content at least until the next buffer modification,
+  // so immediate changes to the slice will affect the result of future reads.
+  const char* data() const { return buffer_ + read_index_; }
+
+  char* WriteBegin() { return begin() + write_index_; }
+
+  const char* WriteBegin() const { return begin() + write_index_; }
+
+  // length returns the number of bytes of the unread portion of the buffer
+  size_t length() const {
+    assert(write_index_ >= read_index_);
+    return write_index_ - read_index_;
+  }
+
+  // size returns the number of bytes of the unread portion of the buffer.
+  // It is the same as length().
+  size_t size() const { return length(); }
+
+  // capacity returns the capacity of the buffer's underlying byte slice, that is, the
+  // total space allocated for the buffer's data.
+  size_t capacity() const { return capacity_; }
+
+  size_t WritableBytes() const {
+    assert(capacity_ >= write_index_);
+    return capacity_ - write_index_;
+  }
+
+  size_t PrependableBytes() const { return read_index_; }
+
+ public:
+  char* begin() { return buffer_; }
+
+  const char* begin() const { return buffer_; }
+
+  void grow(size_t len) {
+    if (!has_mem_) {
+      return;
+    }
+    if (WritableBytes() + PrependableBytes() < len + reserved_prepend_size_) {
+      // grow the capacity
+      size_t n = (capacity_ << 1) + len;
+      size_t m = length();
+      char* d = new char[n];
+      memcpy(d + reserved_prepend_size_, begin() + read_index_, m);
+      write_index_ = m + reserved_prepend_size_;
+      read_index_ = reserved_prepend_size_;
+      capacity_ = n;
+      delete[] buffer_;
+      buffer_ = d;
+    } else {
+      // move readable data to the front, make space inside buffer
+      assert(reserved_prepend_size_ < read_index_);
+      size_t readable = length();
+      memmove(begin() + reserved_prepend_size_, begin() + read_index_, length());
+      read_index_ = reserved_prepend_size_;
+      write_index_ = read_index_ + readable;
+      assert(readable == length());
+      assert(WritableBytes() >= len);
+    }
+  }
+
+ private:
+  char* buffer_;
+  size_t capacity_;
+  size_t read_index_;
+  size_t write_index_;
+  size_t reserved_prepend_size_;
+  bool has_mem_{true};
+};
+
+}  // namespace tubemq
+#endif /* _TUBEMQ_BUFFER_H_ */