You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xi...@apache.org on 2023/02/07 12:20:45 UTC
[orc] branch main updated: ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)
This is an automated email from the ASF dual-hosted git repository.
xinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 32dffd30f ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)
32dffd30f is described below
commit 32dffd30f747f7129b9b9b9a9ef8022660b28b5a
Author: coderex2522 <re...@gmail.com>
AuthorDate: Tue Feb 7 20:20:38 2023 +0800
ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)
* ORC-1312: [C++] Support setting the capacity of output buffer in the class BufferedOutputStream
---
c++/include/orc/Writer.hh | 13 +++++++++++
c++/src/ColumnWriter.cc | 2 +-
c++/src/Writer.cc | 19 +++++++++++----
c++/test/TestWriter.cc | 59 +++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 88 insertions(+), 5 deletions(-)
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 5eed39033..586016b00 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -262,6 +262,19 @@ namespace orc {
* @return if not set, the default is false
*/
bool getUseTightNumericVector() const;
+
+ /**
+ * Set the initial capacity of output buffer in the class BufferedOutputStream.
+ * Each column contains one or more BufferOutputStream depending on its type,
+ * and these buffers will automatically expand when more memory is required.
+ */
+ WriterOptions& setOutputBufferCapacity(uint64_t capacity);
+
+ /**
+ * Get the initial capacity of output buffer in the class BufferedOutputStream.
+ * @return if not set, return default value which is 1 MB.
+ */
+ uint64_t getOutputBufferCapacity() const;
};
class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 8e41a4d10..2565cbe1c 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -49,7 +49,7 @@ namespace orc {
// WriterOption
return createCompressor(options.getCompression(), outStream, options.getCompressionStrategy(),
// BufferedOutputStream initial capacity
- 1 * 1024 * 1024, options.getCompressionBlockSize(),
+ options.getOutputBufferCapacity(), options.getCompressionBlockSize(),
*options.getMemoryPool(), options.getWriterMetrics());
}
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index bc174ae1b..14ee68f71 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -45,6 +45,7 @@ namespace orc {
std::string timezone;
WriterMetrics* metrics;
bool useTightNumericVector;
+ uint64_t outputBufferCapacity;
WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
stripeSize = 64 * 1024 * 1024; // 64M
@@ -65,6 +66,7 @@ namespace orc {
timezone = "GMT";
metrics = nullptr;
useTightNumericVector = false;
+ outputBufferCapacity = 1024 * 1024;
}
};
@@ -273,6 +275,15 @@ namespace orc {
return privateBits->useTightNumericVector;
}
+ WriterOptions& WriterOptions::setOutputBufferCapacity(uint64_t capacity) {
+ privateBits->outputBufferCapacity = capacity;
+ return *this;
+ }
+
+ uint64_t WriterOptions::getOutputBufferCapacity() const {
+ return privateBits->outputBufferCapacity;
+ }
+
Writer::~Writer() {
// PASS
}
@@ -333,10 +344,10 @@ namespace orc {
useTightNumericVector = opts.getUseTightNumericVector();
// compression stream for stripe footer, file footer and metadata
- compressionStream = createCompressor(
- options.getCompression(), outStream, options.getCompressionStrategy(),
- 1 * 1024 * 1024, // buffer capacity: 1M
- options.getCompressionBlockSize(), *options.getMemoryPool(), options.getWriterMetrics());
+ compressionStream =
+ createCompressor(options.getCompression(), outStream, options.getCompressionStrategy(),
+ options.getOutputBufferCapacity(), options.getCompressionBlockSize(),
+ *options.getMemoryPool(), options.getWriterMetrics());
// uncompressed stream for post script
bufferedStream.reset(new BufferedOutputStream(*options.getMemoryPool(), outStream,
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index b4b0801dc..fbc13d24e 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1835,6 +1835,65 @@ namespace orc {
testSuppressPresentStream(CompressionKind_SNAPPY);
}
+ void testSetOutputBufferCapacity(uint64_t capacity) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ size_t rowCount = 1000;
+ {
+ auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int,col2:int>"));
+ WriterOptions options;
+ options.setStripeSize(1024 * 1024)
+ .setCompressionBlockSize(64 * 1024)
+ .setCompression(CompressionKind_NONE)
+ .setMemoryPool(pool)
+ .setRowIndexStride(1000)
+ .setOutputBufferCapacity(capacity);
+
+ auto writer = createWriter(*type, &memStream, options);
+ auto batch = writer->createRowBatch(rowCount);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+ structBatch.numElements = rowCount;
+ longBatch1.numElements = rowCount;
+ longBatch2.numElements = rowCount;
+ for (size_t i = 0; i < rowCount; ++i) {
+ longBatch1.data[i] = static_cast<int64_t>(i * 100);
+ longBatch2.data[i] = static_cast<int64_t>(i * 300);
+ }
+ writer->add(*batch);
+ writer->close();
+ }
+ // read orc file & check the data
+ {
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream(memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ auto batch = rowReader->createRowBatch(rowCount);
+ EXPECT_TRUE(rowReader->next(*batch));
+ EXPECT_EQ(rowCount, batch->numElements);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+ for (size_t i = 0; i < rowCount; ++i) {
+ EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i * 100));
+ EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i * 300));
+ }
+ }
+ }
+
+ TEST(WriterTest, setOutputBufferCapacity) {
+ // compression block size > output buffer capacity
+ testSetOutputBufferCapacity(1024);
+ // compression block size = output buffer capacity
+ testSetOutputBufferCapacity(64 * 1024);
+ // compression block size < output buffer capacity
+ testSetOutputBufferCapacity(1024 * 1024);
+ }
+
TEST_P(WriterTest, testWriteFixedWidthNumericVectorBatch) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();