You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/05/25 10:41:18 UTC
[incubator-doris] branch master updated: [fix] disable transfer data large than 2GB by brpc (#9770)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f5bef328fe [fix] disable transfer data large than 2GB by brpc (#9770)
f5bef328fe is described below
commit f5bef328fe4c4701bb3eccfb02f992d68c6fa5ae
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed May 25 18:41:13 2022 +0800
[fix] disable transfer data large than 2GB by brpc (#9770)
because of brpc and protobuf cannot transfer data large than 2GB, if large than 2GB will overflow, so add a check before send
---
be/src/runtime/row_batch.cpp | 11 ++---------
be/src/vec/core/block.cpp | 35 ++++++++++++++++++++++++++++-------
2 files changed, 30 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 520a5642f9..5acf7634de 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -273,10 +273,6 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
try {
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
_compression_scratch.resize(max_compressed_size);
- } catch (const std::bad_alloc& e) {
- can_compress = false;
- LOG(WARNING) << "Try to alloc " << max_compressed_size
- << " bytes for compression scratch failed. " << e.what();
} catch (...) {
can_compress = false;
std::exception_ptr p = std::current_exception();
@@ -309,11 +305,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
- return Status::InternalError(
- fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
- "please set BE config 'transfer_data_by_brpc_attachment' to true "
- "and restart BE.",
- pb_size));
+ return Status::InternalError(fmt::format(
+ "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
}
} else {
*uncompressed_size = pb_size + tuple_byte_size;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index aa482fcfbf..4866a1129e 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -669,7 +669,16 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
// serialize data values
// when data type is HLL, content_uncompressed_size maybe larger than real size.
- allocated_buf->resize(content_uncompressed_size);
+ try {
+ allocated_buf->resize(content_uncompressed_size);
+ } catch (...) {
+ std::exception_ptr p = std::current_exception();
+ std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}",
+ content_uncompressed_size,
+ p ? p.__cxa_exception_type()->name() : "null");
+ LOG(WARNING) << msg;
+ return Status::BufferAllocFailed(msg);
+ }
char* buf = allocated_buf->data();
for (const auto& c : *this) {
buf = c.type->serialize(*(c.column), buf);
@@ -678,12 +687,21 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
- // Try compressing the content to compression_scratch,
- // swap if compressed data is smaller
+ size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
std::string compression_scratch;
- uint32_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
- compression_scratch.resize(max_compressed_size);
-
+ try {
+ // Try compressing the content to compression_scratch,
+ // swap if compressed data is smaller
+ // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
+ compression_scratch.resize(max_compressed_size);
+ } catch (...) {
+ std::exception_ptr p = std::current_exception();
+ std::string msg =
+ fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}",
+ max_compressed_size, p ? p.__cxa_exception_type()->name() : "null");
+ LOG(WARNING) << msg;
+ return Status::BufferAllocFailed(msg);
+ }
size_t compressed_size = 0;
char* compressed_output = compression_scratch.data();
snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
@@ -701,7 +719,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
VLOG_ROW << "uncompressed size: " << content_uncompressed_size
<< ", compressed size: " << compressed_size;
}
-
+ if (*compressed_bytes >= std::numeric_limits<int32_t>::max()) {
+ return Status::InternalError(fmt::format(
+ "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
+ }
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org