You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@orc.apache.org by GitBox <gi...@apache.org> on 2022/10/12 04:11:13 UTC

[GitHub] [orc] coderex2522 opened a new pull request, #1275: ORC-1280: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream(PART II)

coderex2522 opened a new pull request, #1275:
URL: https://github.com/apache/orc/pull/1275

   ### What changes were proposed in this pull request?
   This PR can solve the huge memory taken by BufferedOutputStream and refactor the write data logic in class CompressionBase.
   
   ### Why are the changes needed?
   This patch use BlockBuffer to replace DataBuffer  of class BufferedOutputStream in order to solve the [issue](https://github.com/apache/orc/issues/1240).
   
   ### How was this patch tested?
   The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover this patch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] wgtmac commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1004643214


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,14 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataSize > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      uint64_t IOCount = dataBuffer->writeTo(outputStream);

Review Comment:
   Return IOCount is weird. Better to pass the pointer of metrics into the writeTo function as a parameter.



##########
c++/src/BlockBuffer.cc:
##########
@@ -82,4 +83,42 @@ namespace orc {
       }
     }
   }
+
+  uint64_t BlockBuffer::writeTo(OutputStream* output) {
+    static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+    uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
+    if (chunkSize == 0) {
+      throw std::logic_error("Natural write size cannot be zero");
+    }
+    char* chunk = memoryPool.malloc(chunkSize);

Review Comment:
   Yes, however you can still get rid of the allocation when blockNumber == 1 && naturalWriteSize >= block.size. BTW, we can keep the current implementation for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] dongjoon-hyun closed pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream
URL: https://github.com/apache/orc/pull/1275


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] dongjoon-hyun commented on pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #1275:
URL: https://github.com/apache/orc/pull/1275#issuecomment-1292271019

   Merged to main for Apache ORC 1.9.
   
   cc @williamhyun , too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] wgtmac commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1004244609


##########
c++/src/BlockBuffer.cc:
##########
@@ -82,4 +83,42 @@ namespace orc {
       }
     }
   }
+
+  uint64_t BlockBuffer::writeTo(OutputStream* output) {
+    static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+    uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
+    if (chunkSize == 0) {
+      throw std::logic_error("Natural write size cannot be zero");
+    }
+    char* chunk = memoryPool.malloc(chunkSize);

Review Comment:
   If blockNumber == 1, we should avoid this unnecessary allocation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1002666769


##########
c++/src/Compression.cc:
##########
@@ -138,19 +147,44 @@ namespace orc {
            static_cast<uint64_t>(outputSize - outputPosition);
   }
 
+  void CompressionStreamBase::writeData(const unsigned char* data, int size) {

Review Comment:
   done.



##########
c++/src/Compression.cc:
##########
@@ -138,19 +147,44 @@ namespace orc {
            static_cast<uint64_t>(outputSize - outputPosition);
   }
 
+  void CompressionStreamBase::writeData(const unsigned char* data, int size) {
+    int offset = 0;
+    while (offset < size) {
+      if (outputPosition == outputSize) {
+        if (!BufferedOutputStream::Next(
+          reinterpret_cast<void **>(&outputBuffer),
+          &outputSize)) {
+            throw std::runtime_error(
+                "Failed to get next output buffer from output stream.");
+        }
+        outputPosition = 0;
+      } else  if (outputPosition > outputSize) {
+        // this will unlikely happen, but we have seen a few on zstd v1.1.0

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1005153397


##########
c++/src/BlockBuffer.cc:
##########
@@ -82,4 +83,42 @@ namespace orc {
       }
     }
   }
+
+  uint64_t BlockBuffer::writeTo(OutputStream* output) {
+    static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+    uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
+    if (chunkSize == 0) {
+      throw std::logic_error("Natural write size cannot be zero");
+    }
+    char* chunk = memoryPool.malloc(chunkSize);

Review Comment:
   Allocation can be avoided here when blockNumber == 1 && currentSize <= NaturalWriteSize.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1004086464


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,30 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataBuffer->getBlockNumber() > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      uint64_t ioCount = 0;
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      // try to merge adjacent IO requests
+      BlockBuffer::Block mergeBlock = dataBuffer->getBlock(0);
+      for (uint64_t i = 1; i < dataBuffer->getBlockNumber(); ++i) {
+        auto curBlock = dataBuffer->getBlock(i);
+        if (mergeBlock.data + mergeBlock.size == curBlock.data) {

Review Comment:
   High probability of not happening. So remove this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1005151644


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,14 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataSize > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      uint64_t IOCount = dataBuffer->writeTo(outputStream);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on PR #1275:
URL: https://github.com/apache/orc/pull/1275#issuecomment-1275848187

   > @coderex2522 . Could you use a new JIRA instead of using `PART 1` and `PART II`? Apache community uses ORC JIRA ID for trace-ability.
   
   @dongjoon-hyun Thank you for your suggestion. I create a new JIRA [ORC-1286](https://issues.apache.org/jira/browse/ORC-1286).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1002730947


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,11 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    for (uint64_t i = 0; i < dataBuffer->getBlockNumber(); ++i)
     {
       SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      auto block = dataBuffer->getBlock(i);
+      outputStream->write(block.data, block.size);

Review Comment:
   I try to merge the adjacent write requests here. This part of the logic is like IO merge, so I thought it would be more appropriate to put it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] dongjoon-hyun commented on pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #1275:
URL: https://github.com/apache/orc/pull/1275#issuecomment-1277833206

   Thank you for updating the PR and creating JIRA, @coderex2522 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] wgtmac commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r994171033


##########
c++/src/Compression.cc:
##########
@@ -112,7 +118,10 @@ namespace orc {
                                                 bufferSize(0),
                                                 outputPosition(0),
                                                 outputSize(0) {
-    // PASS
+    // init header pointer array
+    for (int i = 0; i < HEADER_SIZE; ++i) {

Review Comment:
   Use single memset?



##########
c++/src/Compression.cc:
##########
@@ -138,19 +147,44 @@ namespace orc {
            static_cast<uint64_t>(outputSize - outputPosition);
   }
 
+  void CompressionStreamBase::writeData(const unsigned char* data, int size) {
+    int offset = 0;
+    while (offset < size) {
+      if (outputPosition == outputSize) {
+        if (!BufferedOutputStream::Next(
+          reinterpret_cast<void **>(&outputBuffer),
+          &outputSize)) {
+            throw std::runtime_error(
+                "Failed to get next output buffer from output stream.");
+        }
+        outputPosition = 0;
+      } else  if (outputPosition > outputSize) {
+        // this will unlikely happen, but we have seen a few on zstd v1.1.0

Review Comment:
   I'd suggest not to explicitly mention that name but simply explain this is a safety check. BTW, the exception should mention it fails in the compressor.



##########
c++/src/io/OutputStream.cc:
##########
@@ -37,7 +37,7 @@ namespace orc {
                                     : outputStream(outStream),
                                       blockSize(blockSize_),
                                       metrics(metrics_) {
-    dataBuffer.reset(new DataBuffer<char>(pool));
+    dataBuffer.reset(new BlockBuffer(pool, blockSize));

Review Comment:
   Does the **blockSize_** here have the same meaning of BlockBuffer?



##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,11 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    for (uint64_t i = 0; i < dataBuffer->getBlockNumber(); ++i)
     {
       SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      auto block = dataBuffer->getBlock(i);
+      outputStream->write(block.data, block.size);

Review Comment:
   Here may introduce a performance regression by issuing more write I/Os. If writing to cloud object store such as S3, concatenating buffer and making single write is probably a better choice. A possible solution is to add void BlockBuffer::writeTo(OutputStream*) and handle the logic internally.



##########
c++/src/Compression.cc:
##########
@@ -138,19 +147,44 @@ namespace orc {
            static_cast<uint64_t>(outputSize - outputPosition);
   }
 
+  void CompressionStreamBase::writeData(const unsigned char* data, int size) {

Review Comment:
   Please add a comment



##########
c++/src/Compression.cc:
##########
@@ -93,6 +95,10 @@ namespace orc {
 
     // Compress output buffer size
     int outputSize;
+
+    // Compression block header pointer array
+    static const int HEADER_SIZE = 3;
+    char* header[HEADER_SIZE];

Review Comment:
   Can we use std::array?



##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,11 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    for (uint64_t i = 0; i < dataBuffer->getBlockNumber(); ++i)
     {
       SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);

Review Comment:
   Measure it outside the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] wgtmac commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1002858070


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,30 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataBuffer->getBlockNumber() > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      uint64_t ioCount = 0;
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      // try to merge adjacent IO requests
+      BlockBuffer::Block mergeBlock = dataBuffer->getBlock(0);
+      for (uint64_t i = 1; i < dataBuffer->getBlockNumber(); ++i) {
+        auto curBlock = dataBuffer->getBlock(i);
+        if (mergeBlock.data + mergeBlock.size == curBlock.data) {
+          mergeBlock.size += curBlock.size;
+        } else {
+          outputStream->write(mergeBlock.data, mergeBlock.size);

Review Comment:
   We may also limit each write not to exceed outputStream->getNaturalWriteSize()



##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,30 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataBuffer->getBlockNumber() > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      uint64_t ioCount = 0;
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      // try to merge adjacent IO requests
+      BlockBuffer::Block mergeBlock = dataBuffer->getBlock(0);
+      for (uint64_t i = 1; i < dataBuffer->getBlockNumber(); ++i) {
+        auto curBlock = dataBuffer->getBlock(i);
+        if (mergeBlock.data + mergeBlock.size == curBlock.data) {

Review Comment:
   This leaves the work to the memory allocator and it may never happen.



##########
c++/src/io/OutputStream.cc:
##########
@@ -37,7 +37,7 @@ namespace orc {
                                     : outputStream(outStream),
                                       blockSize(blockSize_),
                                       metrics(metrics_) {
-    dataBuffer.reset(new DataBuffer<char>(pool));
+    dataBuffer.reset(new BlockBuffer(pool, blockSize));

Review Comment:
   OK, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1002666732


##########
c++/src/Compression.cc:
##########
@@ -112,7 +118,10 @@ namespace orc {
                                                 bufferSize(0),
                                                 outputPosition(0),
                                                 outputSize(0) {
-    // PASS
+    // init header pointer array
+    for (int i = 0; i < HEADER_SIZE; ++i) {

Review Comment:
   use array.fill() function to initial the header array here.



##########
c++/src/Compression.cc:
##########
@@ -93,6 +95,10 @@ namespace orc {
 
     // Compress output buffer size
     int outputSize;
+
+    // Compression block header pointer array
+    static const int HEADER_SIZE = 3;
+    char* header[HEADER_SIZE];

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1002731438


##########
c++/src/io/OutputStream.cc:
##########
@@ -37,7 +37,7 @@ namespace orc {
                                     : outputStream(outStream),
                                       blockSize(blockSize_),
                                       metrics(metrics_) {
-    dataBuffer.reset(new DataBuffer<char>(pool));
+    dataBuffer.reset(new BlockBuffer(pool, blockSize));

Review Comment:
   Before this patch, Data buffer will increase memory in blockSize_ during the next() function of class BufferedOutputStream. So the blockSize_ is reasonable as a construction parameter for class BlockBuffer. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1004089159


##########
c++/src/io/OutputStream.cc:
##########
@@ -95,9 +91,30 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataBuffer->getBlockNumber() > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      uint64_t ioCount = 0;
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      // try to merge adjacent IO requests
+      BlockBuffer::Block mergeBlock = dataBuffer->getBlock(0);
+      for (uint64_t i = 1; i < dataBuffer->getBlockNumber(); ++i) {
+        auto curBlock = dataBuffer->getBlock(i);
+        if (mergeBlock.data + mergeBlock.size == curBlock.data) {
+          mergeBlock.size += curBlock.size;
+        } else {
+          outputStream->write(mergeBlock.data, mergeBlock.size);

Review Comment:
   The writeTo() of BlockBuffer will limit each write not to exceed NaturalWriteSize of outputStream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [orc] coderex2522 commented on a diff in pull request #1275: ORC-1286: [C++] replace DataBuffer with BlockBuffer in class BufferedOutputStream

Posted by GitBox <gi...@apache.org>.
coderex2522 commented on code in PR #1275:
URL: https://github.com/apache/orc/pull/1275#discussion_r1004362886


##########
c++/src/BlockBuffer.cc:
##########
@@ -82,4 +83,42 @@ namespace orc {
       }
     }
   }
+
+  uint64_t BlockBuffer::writeTo(OutputStream* output) {
+    static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+    uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
+    if (chunkSize == 0) {
+      throw std::logic_error("Natural write size cannot be zero");
+    }
+    char* chunk = memoryPool.malloc(chunkSize);

Review Comment:
   if natural write size less than block size, here need to malloc chunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org