You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/10/12 01:41:47 UTC

[orc] branch main updated: ORC-1280: [C++] Implement block-based buffer(Part I)

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cf1047f9 ORC-1280: [C++] Implement block-based buffer(Part I)
8cf1047f9 is described below

commit 8cf1047f9ace3799df12f24d2a5096b17a9a6ed0
Author: coderex2522 <re...@gmail.com>
AuthorDate: Wed Oct 12 09:41:40 2022 +0800

    ORC-1280: [C++] Implement block-based buffer(Part I)
    
    This closes #1271
---
 c++/src/BlockBuffer.cc      |  85 ++++++++++++++++++++++++++++++++
 c++/src/BlockBuffer.hh      | 116 ++++++++++++++++++++++++++++++++++++++++++++
 c++/src/CMakeLists.txt      |   1 +
 c++/test/CMakeLists.txt     |   1 +
 c++/test/TestBlockBuffer.cc |  81 +++++++++++++++++++++++++++++++
 5 files changed, 284 insertions(+)

diff --git a/c++/src/BlockBuffer.cc b/c++/src/BlockBuffer.cc
new file mode 100644
index 000000000..aac0d4798
--- /dev/null
+++ b/c++/src/BlockBuffer.cc
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BlockBuffer.hh"
+
+#include <algorithm>
+
+namespace orc {
+
+  BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
+      : memoryPool(pool),
+        currentSize(0),
+        currentCapacity(0),
+        blockSize(_blockSize) {
+    if (blockSize == 0) {
+      throw std::logic_error("Block size cannot be zero");
+    }
+    reserve(blockSize);
+  }
+
+  BlockBuffer::~BlockBuffer() {
+    for (size_t i = 0; i < blocks.size(); ++i) {
+      memoryPool.free(blocks[i]);
+    }
+    blocks.clear();
+    currentSize = currentCapacity = 0;
+  }
+
+  BlockBuffer::Block BlockBuffer::getBlock(uint64_t blockIndex) const {
+    if (blockIndex >= getBlockNumber()) {
+      throw std::out_of_range("Block index out of range");
+    }
+    return Block(blocks[blockIndex],
+                 std::min(currentSize - blockIndex * blockSize, blockSize));
+  }
+
+  BlockBuffer::Block BlockBuffer::getNextBlock() {
+    if (currentSize < currentCapacity) {
+      Block emptyBlock(
+          blocks[currentSize / blockSize] + currentSize % blockSize,
+          blockSize - currentSize % blockSize);
+      currentSize = (currentSize / blockSize + 1) * blockSize;
+      return emptyBlock;
+    } else {
+      resize(currentSize + blockSize);
+      return Block(blocks.back(), blockSize);
+    }
+  }
+
+  void BlockBuffer::resize(uint64_t size) {
+    reserve(size);
+    if (currentCapacity >= size) {
+      currentSize = size;
+    } else {
+      throw std::logic_error("Block buffer resize error");
+    }
+  }
+
+  void BlockBuffer::reserve(uint64_t newCapacity) {
+    while (currentCapacity < newCapacity) {
+      char* newBlockPtr = memoryPool.malloc(blockSize);
+      if (newBlockPtr != nullptr) {
+        blocks.push_back(newBlockPtr);
+        currentCapacity += blockSize;
+      } else {
+        break;
+      }
+    }
+  }
+} // namespace orc
diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
new file mode 100644
index 000000000..bb22b8a02
--- /dev/null
+++ b/c++/src/BlockBuffer.hh
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOCK_BUFFER_HH
+#define ORC_BLOCK_BUFFER_HH
+
+#include "orc/MemoryPool.hh"
+
+#include <vector>
+
+namespace orc {
+
+  /**
+   * BlockBuffer implements a memory allocation policy based on
+   * equal-length blocks. BlockBuffer will reserve multiple blocks
+   * for allocation.
+   */
+  class BlockBuffer {
+  private:
+    MemoryPool& memoryPool;
+    // current buffer size
+    uint64_t currentSize;
+    // maximal capacity (actual allocated memory)
+    uint64_t currentCapacity;
+    // unit for buffer expansion
+    const uint64_t blockSize;
+    // pointers to the start of each block
+    std::vector<char*> blocks;
+
+    // non-copy-constructible
+    BlockBuffer(BlockBuffer& buffer) = delete;
+    BlockBuffer& operator=(BlockBuffer& buffer) = delete;
+    BlockBuffer(BlockBuffer&& buffer) = delete;
+    BlockBuffer& operator=(BlockBuffer&& buffer) = delete;
+
+  public:
+    BlockBuffer(MemoryPool& pool, uint64_t blockSize);
+
+    ~BlockBuffer();
+
+    /**
+    * Block points to a section of memory allocated by BlockBuffer,
+    * containing the corresponding physical memory address and available size.
+    */
+    struct Block {
+      // the start of block
+      char* data;
+      // number of bytes available at data
+      uint64_t size;
+
+      Block() : data(nullptr), size(0) {}
+      Block(char* _data, uint64_t _size) : data(_data), size(_size) {}
+      Block(const Block& block) = default;
+      ~Block() = default;
+    };
+
+    /**
+     * Get the allocated block object.
+     * The last allocated block size may be less than blockSize,
+     * and the rest of the blocks are all of size blockSize.
+     * @param blockIndex the index of blocks
+     * @return the allocated block object
+     */
+    Block getBlock(uint64_t blockIndex) const;
+
+    /**
+     * Get a empty block or allocate a new block to write.
+     * If the last allocated block size is less than blockSize,
+     * the size of empty block is equal to blockSize minus the size of
+     * the last allocated block size. Otherwise, the size of
+     * the empty block is equal to blockSize.
+     * @return a empty block object
+     */
+    Block getNextBlock();
+
+    /**
+     * Get the number of blocks that are fully or partially occupied
+     */
+    uint64_t getBlockNumber() const {
+      return (currentSize + blockSize - 1) / blockSize;
+    }
+
+    uint64_t size() const {
+      return currentSize;
+    }
+
+    uint64_t capacity() const {
+      return currentCapacity;
+    }
+
+    void resize(uint64_t size);
+    /**
+     * Requests the BlockBuffer to contain at least newCapacity bytes.
+     * Reallocation happens if there is need of more space.
+     * @param newCapacity new capacity of BlockBuffer
+     */
+    void reserve(uint64_t newCapacity);
+  };
+}  // namespace orc
+
+#endif
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index d4cd03504..5608fbad0 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -205,6 +205,7 @@ set(SOURCE_FILES
   sargs/TruthValue.cc
   wrap/orc-proto-wrapper.cc
   Adaptor.cc
+  BlockBuffer.cc
   BloomFilter.cc
   ByteRLE.cc
   ColumnPrinter.cc
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 00aca6b55..567753b4e 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -22,6 +22,7 @@ add_executable (orc-test
   MemoryInputStream.cc
   MemoryOutputStream.cc
   TestAttributes.cc
+  TestBlockBuffer.cc
   TestBufferedOutputStream.cc
   TestBloomFilter.cc
   TestByteRle.cc
diff --git a/c++/test/TestBlockBuffer.cc b/c++/test/TestBlockBuffer.cc
new file mode 100644
index 000000000..c638490e6
--- /dev/null
+++ b/c++/test/TestBlockBuffer.cc
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BlockBuffer.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  TEST(TestBlockBuffer, size_and_capacity) {
+    MemoryPool* pool = getDefaultPool();
+    BlockBuffer buffer(*pool, 1024);
+
+    // block buffer will preallocate one block during initialization
+    EXPECT_EQ(buffer.getBlockNumber(), 0);
+    EXPECT_EQ(buffer.size(), 0);
+    EXPECT_EQ(buffer.capacity(), 1024);
+
+    buffer.reserve(128 * 1024);
+    EXPECT_EQ(buffer.getBlockNumber(), 0);
+    EXPECT_EQ(buffer.size(), 0);
+    EXPECT_EQ(buffer.capacity(), 128 * 1024);
+
+    // new size < old capacity
+    buffer.resize(64 * 1024);
+    EXPECT_EQ(buffer.getBlockNumber(), 64);
+    EXPECT_EQ(buffer.size(), 64 * 1024);
+    EXPECT_EQ(buffer.capacity(), 128 * 1024);
+
+    // new size > old capacity
+    buffer.resize(256 * 1024);
+    EXPECT_EQ(buffer.getBlockNumber(), 256);
+    EXPECT_EQ(buffer.size(), 256 * 1024);
+    EXPECT_EQ(buffer.capacity(), 256 * 1024);
+  }
+
+  TEST(TestBlockBuffer, get_block) {
+    MemoryPool* pool = getDefaultPool();
+    BlockBuffer buffer(*pool, 1024);
+
+    EXPECT_EQ(buffer.getBlockNumber(), 0);
+    for (uint64_t i = 0; i < 10; ++i) {
+      BlockBuffer::Block block = buffer.getNextBlock();
+      EXPECT_EQ(buffer.getBlockNumber(), i + 1);
+      for (uint64_t j = 0; j < block.size; ++j) {
+        if (i % 2 == 0) {
+          block.data[j] = static_cast<char>('A' + (i + j) % 26);
+        } else {
+          block.data[j] = static_cast<char>('a' + (i + j) % 26);
+        }
+      }
+    }
+
+    // verify the block data
+    for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
+      BlockBuffer::Block block = buffer.getBlock(i);
+      for (uint64_t j = 0; j < block.size; ++j) {
+        if (i % 2 == 0) {
+          EXPECT_EQ(block.data[j], 'A' + (i + j) % 26);
+        } else {
+          EXPECT_EQ(block.data[j], 'a' + (i + j) % 26);
+        }
+      }
+    }
+  }
+}