You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xi...@apache.org on 2023/02/07 12:20:45 UTC

[orc] branch main updated: ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)

This is an automated email from the ASF dual-hosted git repository.

xinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 32dffd30f ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)
32dffd30f is described below

commit 32dffd30f747f7129b9b9b9a9ef8022660b28b5a
Author: coderex2522 <re...@gmail.com>
AuthorDate: Tue Feb 7 20:20:38 2023 +0800

    ORC-1312: [C++] Support setting the block buffer capacity of BufferedOutputStream (#1394)
    
    * ORC-1312: [C++] Support setting the capacity of output buffer in the class BufferedOutputStream
---
 c++/include/orc/Writer.hh | 13 +++++++++++
 c++/src/ColumnWriter.cc   |  2 +-
 c++/src/Writer.cc         | 19 +++++++++++----
 c++/test/TestWriter.cc    | 59 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 5eed39033..586016b00 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -262,6 +262,19 @@ namespace orc {
      * @return if not set, the default is false
      */
     bool getUseTightNumericVector() const;
+
+    /**
+     * Set the initial capacity of output buffer in the class BufferedOutputStream.
+     * Each column contains one or more BufferOutputStream depending on its type,
+     * and these buffers will automatically expand when more memory is required.
+     */
+    WriterOptions& setOutputBufferCapacity(uint64_t capacity);
+
+    /**
+     * Get the initial capacity of output buffer in the class BufferedOutputStream.
+     * @return if not set, return default value which is 1 MB.
+     */
+    uint64_t getOutputBufferCapacity() const;
   };
 
   class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 8e41a4d10..2565cbe1c 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -49,7 +49,7 @@ namespace orc {
     // WriterOption
     return createCompressor(options.getCompression(), outStream, options.getCompressionStrategy(),
                             // BufferedOutputStream initial capacity
-                            1 * 1024 * 1024, options.getCompressionBlockSize(),
+                            options.getOutputBufferCapacity(), options.getCompressionBlockSize(),
                             *options.getMemoryPool(), options.getWriterMetrics());
   }
 
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index bc174ae1b..14ee68f71 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -45,6 +45,7 @@ namespace orc {
     std::string timezone;
     WriterMetrics* metrics;
     bool useTightNumericVector;
+    uint64_t outputBufferCapacity;
 
     WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) {  // default to Hive_0_12
       stripeSize = 64 * 1024 * 1024;                               // 64M
@@ -65,6 +66,7 @@ namespace orc {
       timezone = "GMT";
       metrics = nullptr;
       useTightNumericVector = false;
+      outputBufferCapacity = 1024 * 1024;
     }
   };
 
@@ -273,6 +275,15 @@ namespace orc {
     return privateBits->useTightNumericVector;
   }
 
+  WriterOptions& WriterOptions::setOutputBufferCapacity(uint64_t capacity) {
+    privateBits->outputBufferCapacity = capacity;
+    return *this;
+  }
+
+  uint64_t WriterOptions::getOutputBufferCapacity() const {
+    return privateBits->outputBufferCapacity;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -333,10 +344,10 @@ namespace orc {
     useTightNumericVector = opts.getUseTightNumericVector();
 
     // compression stream for stripe footer, file footer and metadata
-    compressionStream = createCompressor(
-        options.getCompression(), outStream, options.getCompressionStrategy(),
-        1 * 1024 * 1024,  // buffer capacity: 1M
-        options.getCompressionBlockSize(), *options.getMemoryPool(), options.getWriterMetrics());
+    compressionStream =
+        createCompressor(options.getCompression(), outStream, options.getCompressionStrategy(),
+                         options.getOutputBufferCapacity(), options.getCompressionBlockSize(),
+                         *options.getMemoryPool(), options.getWriterMetrics());
 
     // uncompressed stream for post script
     bufferedStream.reset(new BufferedOutputStream(*options.getMemoryPool(), outStream,
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index b4b0801dc..fbc13d24e 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1835,6 +1835,65 @@ namespace orc {
     testSuppressPresentStream(CompressionKind_SNAPPY);
   }
 
+  void testSetOutputBufferCapacity(uint64_t capacity) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    size_t rowCount = 1000;
+    {
+      auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int,col2:int>"));
+      WriterOptions options;
+      options.setStripeSize(1024 * 1024)
+          .setCompressionBlockSize(64 * 1024)
+          .setCompression(CompressionKind_NONE)
+          .setMemoryPool(pool)
+          .setRowIndexStride(1000)
+          .setOutputBufferCapacity(capacity);
+
+      auto writer = createWriter(*type, &memStream, options);
+      auto batch = writer->createRowBatch(rowCount);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+      auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+      structBatch.numElements = rowCount;
+      longBatch1.numElements = rowCount;
+      longBatch2.numElements = rowCount;
+      for (size_t i = 0; i < rowCount; ++i) {
+        longBatch1.data[i] = static_cast<int64_t>(i * 100);
+        longBatch2.data[i] = static_cast<int64_t>(i * 300);
+      }
+      writer->add(*batch);
+      writer->close();
+    }
+    // read orc file & check the data
+    {
+      std::unique_ptr<InputStream> inStream(
+          new MemoryInputStream(memStream.getData(), memStream.getLength()));
+      ReaderOptions readerOptions;
+      readerOptions.setMemoryPool(*pool);
+      std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+      std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+      auto batch = rowReader->createRowBatch(rowCount);
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(rowCount, batch->numElements);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+      auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+      for (size_t i = 0; i < rowCount; ++i) {
+        EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i * 100));
+        EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i * 300));
+      }
+    }
+  }
+
+  TEST(WriterTest, setOutputBufferCapacity) {
+    // compression block size > output buffer capacity
+    testSetOutputBufferCapacity(1024);
+    // compression block size = output buffer capacity
+    testSetOutputBufferCapacity(64 * 1024);
+    // compression block size < output buffer capacity
+    testSetOutputBufferCapacity(1024 * 1024);
+  }
+
   TEST_P(WriterTest, testWriteFixedWidthNumericVectorBatch) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool* pool = getDefaultPool();