You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/10/12 01:41:47 UTC
[orc] branch main updated: ORC-1280: [C++] Implement block-based buffer(Part I)
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 8cf1047f9 ORC-1280: [C++] Implement block-based buffer(Part I)
8cf1047f9 is described below
commit 8cf1047f9ace3799df12f24d2a5096b17a9a6ed0
Author: coderex2522 <re...@gmail.com>
AuthorDate: Wed Oct 12 09:41:40 2022 +0800
ORC-1280: [C++] Implement block-based buffer(Part I)
This closes #1271
---
c++/src/BlockBuffer.cc | 85 ++++++++++++++++++++++++++++++++
c++/src/BlockBuffer.hh | 116 ++++++++++++++++++++++++++++++++++++++++++++
c++/src/CMakeLists.txt | 1 +
c++/test/CMakeLists.txt | 1 +
c++/test/TestBlockBuffer.cc | 81 +++++++++++++++++++++++++++++++
5 files changed, 284 insertions(+)
diff --git a/c++/src/BlockBuffer.cc b/c++/src/BlockBuffer.cc
new file mode 100644
index 000000000..aac0d4798
--- /dev/null
+++ b/c++/src/BlockBuffer.cc
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BlockBuffer.hh"
+
+#include <algorithm>
+
+namespace orc {
+
+ BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
+ : memoryPool(pool),
+ currentSize(0),
+ currentCapacity(0),
+ blockSize(_blockSize) {
+ if (blockSize == 0) {
+ throw std::logic_error("Block size cannot be zero");
+ }
+ reserve(blockSize);
+ }
+
+ BlockBuffer::~BlockBuffer() {
+ for (size_t i = 0; i < blocks.size(); ++i) {
+ memoryPool.free(blocks[i]);
+ }
+ blocks.clear();
+ currentSize = currentCapacity = 0;
+ }
+
+ BlockBuffer::Block BlockBuffer::getBlock(uint64_t blockIndex) const {
+ if (blockIndex >= getBlockNumber()) {
+ throw std::out_of_range("Block index out of range");
+ }
+ return Block(blocks[blockIndex],
+ std::min(currentSize - blockIndex * blockSize, blockSize));
+ }
+
+ BlockBuffer::Block BlockBuffer::getNextBlock() {
+ if (currentSize < currentCapacity) {
+ Block emptyBlock(
+ blocks[currentSize / blockSize] + currentSize % blockSize,
+ blockSize - currentSize % blockSize);
+ currentSize = (currentSize / blockSize + 1) * blockSize;
+ return emptyBlock;
+ } else {
+ resize(currentSize + blockSize);
+ return Block(blocks.back(), blockSize);
+ }
+ }
+
+ void BlockBuffer::resize(uint64_t size) {
+ reserve(size);
+ if (currentCapacity >= size) {
+ currentSize = size;
+ } else {
+ throw std::logic_error("Block buffer resize error");
+ }
+ }
+
+ void BlockBuffer::reserve(uint64_t newCapacity) {
+ while (currentCapacity < newCapacity) {
+ char* newBlockPtr = memoryPool.malloc(blockSize);
+ if (newBlockPtr != nullptr) {
+ blocks.push_back(newBlockPtr);
+ currentCapacity += blockSize;
+ } else {
+ break;
+ }
+ }
+ }
+} // namespace orc
diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
new file mode 100644
index 000000000..bb22b8a02
--- /dev/null
+++ b/c++/src/BlockBuffer.hh
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOCK_BUFFER_HH
+#define ORC_BLOCK_BUFFER_HH
+
+#include "orc/MemoryPool.hh"
+
+#include <vector>
+
+namespace orc {
+
+ /**
+ * BlockBuffer implements a memory allocation policy based on
+ * equal-length blocks. BlockBuffer will reserve multiple blocks
+ * for allocation.
+ */
+ class BlockBuffer {
+ private:
+ MemoryPool& memoryPool;
+ // current buffer size
+ uint64_t currentSize;
+ // maximal capacity (actual allocated memory)
+ uint64_t currentCapacity;
+ // unit for buffer expansion
+ const uint64_t blockSize;
+ // pointers to the start of each block
+ std::vector<char*> blocks;
+
+ // non-copy-constructible
+ BlockBuffer(BlockBuffer& buffer) = delete;
+ BlockBuffer& operator=(BlockBuffer& buffer) = delete;
+ BlockBuffer(BlockBuffer&& buffer) = delete;
+ BlockBuffer& operator=(BlockBuffer&& buffer) = delete;
+
+ public:
+ BlockBuffer(MemoryPool& pool, uint64_t blockSize);
+
+ ~BlockBuffer();
+
+ /**
+ * Block points to a section of memory allocated by BlockBuffer,
+ * containing the corresponding physical memory address and available size.
+ */
+ struct Block {
+ // the start of block
+ char* data;
+ // number of bytes available at data
+ uint64_t size;
+
+ Block() : data(nullptr), size(0) {}
+ Block(char* _data, uint64_t _size) : data(_data), size(_size) {}
+ Block(const Block& block) = default;
+ ~Block() = default;
+ };
+
+ /**
+ * Get the allocated block object.
+ * The last allocated block size may be less than blockSize,
+ * and the rest of the blocks are all of size blockSize.
+ * @param blockIndex the index of blocks
+ * @return the allocated block object
+ */
+ Block getBlock(uint64_t blockIndex) const;
+
+ /**
+ * Get a empty block or allocate a new block to write.
+ * If the last allocated block size is less than blockSize,
+ * the size of empty block is equal to blockSize minus the size of
+ * the last allocated block size. Otherwise, the size of
+ * the empty block is equal to blockSize.
+ * @return a empty block object
+ */
+ Block getNextBlock();
+
+ /**
+ * Get the number of blocks that are fully or partially occupied
+ */
+ uint64_t getBlockNumber() const {
+ return (currentSize + blockSize - 1) / blockSize;
+ }
+
+ uint64_t size() const {
+ return currentSize;
+ }
+
+ uint64_t capacity() const {
+ return currentCapacity;
+ }
+
+ void resize(uint64_t size);
+ /**
+ * Requests the BlockBuffer to contain at least newCapacity bytes.
+ * Reallocation happens if there is need of more space.
+ * @param newCapacity new capacity of BlockBuffer
+ */
+ void reserve(uint64_t newCapacity);
+ };
+} // namespace orc
+
+#endif
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index d4cd03504..5608fbad0 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -205,6 +205,7 @@ set(SOURCE_FILES
sargs/TruthValue.cc
wrap/orc-proto-wrapper.cc
Adaptor.cc
+ BlockBuffer.cc
BloomFilter.cc
ByteRLE.cc
ColumnPrinter.cc
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 00aca6b55..567753b4e 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -22,6 +22,7 @@ add_executable (orc-test
MemoryInputStream.cc
MemoryOutputStream.cc
TestAttributes.cc
+ TestBlockBuffer.cc
TestBufferedOutputStream.cc
TestBloomFilter.cc
TestByteRle.cc
diff --git a/c++/test/TestBlockBuffer.cc b/c++/test/TestBlockBuffer.cc
new file mode 100644
index 000000000..c638490e6
--- /dev/null
+++ b/c++/test/TestBlockBuffer.cc
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BlockBuffer.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+ TEST(TestBlockBuffer, size_and_capacity) {
+ MemoryPool* pool = getDefaultPool();
+ BlockBuffer buffer(*pool, 1024);
+
+ // block buffer will preallocate one block during initialization
+ EXPECT_EQ(buffer.getBlockNumber(), 0);
+ EXPECT_EQ(buffer.size(), 0);
+ EXPECT_EQ(buffer.capacity(), 1024);
+
+ buffer.reserve(128 * 1024);
+ EXPECT_EQ(buffer.getBlockNumber(), 0);
+ EXPECT_EQ(buffer.size(), 0);
+ EXPECT_EQ(buffer.capacity(), 128 * 1024);
+
+ // new size < old capacity
+ buffer.resize(64 * 1024);
+ EXPECT_EQ(buffer.getBlockNumber(), 64);
+ EXPECT_EQ(buffer.size(), 64 * 1024);
+ EXPECT_EQ(buffer.capacity(), 128 * 1024);
+
+ // new size > old capacity
+ buffer.resize(256 * 1024);
+ EXPECT_EQ(buffer.getBlockNumber(), 256);
+ EXPECT_EQ(buffer.size(), 256 * 1024);
+ EXPECT_EQ(buffer.capacity(), 256 * 1024);
+ }
+
+ TEST(TestBlockBuffer, get_block) {
+ MemoryPool* pool = getDefaultPool();
+ BlockBuffer buffer(*pool, 1024);
+
+ EXPECT_EQ(buffer.getBlockNumber(), 0);
+ for (uint64_t i = 0; i < 10; ++i) {
+ BlockBuffer::Block block = buffer.getNextBlock();
+ EXPECT_EQ(buffer.getBlockNumber(), i + 1);
+ for (uint64_t j = 0; j < block.size; ++j) {
+ if (i % 2 == 0) {
+ block.data[j] = static_cast<char>('A' + (i + j) % 26);
+ } else {
+ block.data[j] = static_cast<char>('a' + (i + j) % 26);
+ }
+ }
+ }
+
+ // verify the block data
+ for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
+ BlockBuffer::Block block = buffer.getBlock(i);
+ for (uint64_t j = 0; j < block.size; ++j) {
+ if (i % 2 == 0) {
+ EXPECT_EQ(block.data[j], 'A' + (i + j) % 26);
+ } else {
+ EXPECT_EQ(block.data[j], 'a' + (i + j) % 26);
+ }
+ }
+ }
+ }
+}