You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/06/28 13:59:32 UTC

[arrow] branch main updated: GH-36293: [C++] Use ipc_write_options.memory_pool for compressed buffer and shrink after compression (#36294)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new efd5686255 GH-36293: [C++] Use ipc_write_options.memory_pool for compressed buffer and shrink after compression (#36294)
efd5686255 is described below

commit efd5686255da1cfc1f197993bc19c0b2f5a5cbb9
Author: Rong Ma <ro...@intel.com>
AuthorDate: Wed Jun 28 21:59:24 2023 +0800

    GH-36293: [C++] Use ipc_write_options.memory_pool for compressed buffer and shrink after compression (#36294)
    
    ### Rationale for this change
    
    Described in issue #36293 #34025.
    
    ### What changes are included in this PR?
    
    * Allocate buffer for compressed data using the memory pool given by the user
    * Shrink compressed data buffer after compression to conserve memory, as the compressed data might be much smaller than the theoretical max compressed data size
    
    ### Are these changes tested?
    
    Covered by existing tests.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #36293
    * Closes: #34025
    
    Authored-by: Rong Ma <ro...@intel.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/ipc/writer.cc | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index f0f0e96ee4..9986172651 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -192,8 +192,9 @@ class RecordBatchSerializer {
     int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
     int64_t prefixed_length = buffer.size();
 
-    ARROW_ASSIGN_OR_RAISE(auto result,
-                          AllocateResizableBuffer(maximum_length + sizeof(int64_t)));
+    ARROW_ASSIGN_OR_RAISE(
+        auto result,
+        AllocateResizableBuffer(maximum_length + sizeof(int64_t), options_.memory_pool));
     ARROW_ASSIGN_OR_RAISE(auto actual_length,
                           codec->Compress(buffer.size(), buffer.data(), maximum_length,
                                           result->mutable_data() + sizeof(int64_t)));
@@ -213,6 +214,10 @@ class RecordBatchSerializer {
       actual_length = buffer.size();
       // Size of -1 indicates to the reader that the body doesn't need to be decompressed
       prefixed_length = -1;
+    } else {
+      // Shrink compressed buffer
+      RETURN_NOT_OK(
+          result->Resize(actual_length + sizeof(int64_t), /* shrink_to_fit= */ true));
     }
     *reinterpret_cast<int64_t*>(result->mutable_data()) =
         bit_util::ToLittleEndian(prefixed_length);