You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/05/25 10:41:18 UTC

[incubator-doris] branch master updated: [fix] disable transfer data large than 2GB by brpc (#9770)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5bef328fe [fix] disable transfer data large than 2GB by brpc (#9770)
f5bef328fe is described below

commit f5bef328fe4c4701bb3eccfb02f992d68c6fa5ae
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed May 25 18:41:13 2022 +0800

    [fix] disable transfer data large than 2GB by brpc (#9770)
    
    because of brpc and protobuf cannot transfer data large than 2GB, if large than 2GB will overflow, so add a check before send
---
 be/src/runtime/row_batch.cpp | 11 ++---------
 be/src/vec/core/block.cpp    | 35 ++++++++++++++++++++++++++++-------
 2 files changed, 30 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 520a5642f9..5acf7634de 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -273,10 +273,6 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
         try {
             // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
             _compression_scratch.resize(max_compressed_size);
-        } catch (const std::bad_alloc& e) {
-            can_compress = false;
-            LOG(WARNING) << "Try to alloc " << max_compressed_size
-                         << " bytes for compression scratch failed. " << e.what();
         } catch (...) {
             can_compress = false;
             std::exception_ptr p = std::current_exception();
@@ -309,11 +305,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
         *compressed_size = pb_size;
         if (pb_size > std::numeric_limits<int32_t>::max()) {
             // the protobuf has a hard limit of 2GB for serialized data.
-            return Status::InternalError(
-                    fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
-                                "please set BE config 'transfer_data_by_brpc_attachment' to true "
-                                "and restart BE.",
-                                pb_size));
+            return Status::InternalError(fmt::format(
+                    "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
         }
     } else {
         *uncompressed_size = pb_size + tuple_byte_size;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index aa482fcfbf..4866a1129e 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -669,7 +669,16 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real size.
-    allocated_buf->resize(content_uncompressed_size);
+    try {
+        allocated_buf->resize(content_uncompressed_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}",
+                                      content_uncompressed_size,
+                                      p ? p.__cxa_exception_type()->name() : "null");
+        LOG(WARNING) << msg;
+        return Status::BufferAllocFailed(msg);
+    }
     char* buf = allocated_buf->data();
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf);
@@ -678,12 +687,21 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
 
     // compress
     if (config::compress_rowbatches && content_uncompressed_size > 0) {
-        // Try compressing the content to compression_scratch,
-        // swap if compressed data is smaller
+        size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
         std::string compression_scratch;
-        uint32_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
-        compression_scratch.resize(max_compressed_size);
-
+        try {
+            // Try compressing the content to compression_scratch,
+            // swap if compressed data is smaller
+            // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
+            compression_scratch.resize(max_compressed_size);
+        } catch (...) {
+            std::exception_ptr p = std::current_exception();
+            std::string msg =
+                    fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}",
+                                max_compressed_size, p ? p.__cxa_exception_type()->name() : "null");
+            LOG(WARNING) << msg;
+            return Status::BufferAllocFailed(msg);
+        }
         size_t compressed_size = 0;
         char* compressed_output = compression_scratch.data();
         snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
@@ -701,7 +719,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     }
-
+    if (*compressed_bytes >= std::numeric_limits<int32_t>::max()) {
+        return Status::InternalError(fmt::format(
+                "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
+    }
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org