You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/27 21:44:20 UTC
impala git commit: IMPALA-6543: Limit RowBatch serialization size to
INT_MAX
Repository: impala
Updated Branches:
refs/heads/2.x 34d4e8ed4 -> ecbb88da5
IMPALA-6543: Limit RowBatch serialization size to INT_MAX
The serialization format of a row batch relies on
tuple offsets. In its current form, the tuple offsets
are int32s. This means that it is impossible to generate
a valid serialization of a row batch that is larger
than INT_MAX.
This changes RowBatch::SerializeInternal() to return an
error if trying to serialize a row batch larger than INT_MAX.
This prevents a DCHECK on debug builds when creating a row
larger than 2GB.
This also changes the compression logic in RowBatch::Serialize()
to avoid a DCHECK if LZ4 will not be able to compress the
row batch. Instead, it returns an error.
This modifies row-batch-serialize-test to verify behavior at
each of the limits. Specifically:
RowBatches up to size LZ4_MAX_INPUT_SIZE succeed.
RowBatches with size range [LZ4_MAX_INPUT_SIZE+1, INT_MAX]
fail on LZ4 compression.
RowBatches with size > INT_MAX fail with RowBatch too large.
Change-Id: I1f93131facf47bd8d86c3d4923b23186ee90fcbf
Reviewed-on: http://gerrit.cloudera.org:8080/9459
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ecbb88da
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ecbb88da
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ecbb88da
Branch: refs/heads/2.x
Commit: ecbb88da5b82bec92c6dc710125d9529d4c085fa
Parents: 34d4e8e
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue Feb 20 11:38:23 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 27 19:46:58 2018 +0000
----------------------------------------------------------------------
be/src/runtime/row-batch-serialize-test.cc | 112 +++++++++++++++++++++++-
be/src/runtime/row-batch.cc | 20 +++--
be/src/runtime/row-batch.h | 2 +-
common/thrift/generate_error_codes.py | 9 +-
4 files changed, 131 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index 041e4fe..f6a048d 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include <lz4.h>
+
#include "common/init.h"
#include "testutil/gtest-util.h"
#include "runtime/collection-value.h"
@@ -60,13 +62,14 @@ class RowBatchSerializeTest : public testing::Test {
}
// Serializes and deserializes 'batch', then checks that the deserialized batch is valid
- // and has the same contents as 'batch'.
- void TestRowBatch(const RowDescriptor& row_desc, RowBatch* batch, bool print_batches,
- bool full_dedup = false) {
+ // and has the same contents as 'batch'. If serialization returns an error (e.g. if the
+ // row batch is too large to serialize), this will return that error.
+ Status TestRowBatchInternal(const RowDescriptor& row_desc, RowBatch* batch,
+ bool print_batches, bool full_dedup = false) {
if (print_batches) cout << PrintBatch(batch) << endl;
TRowBatch trow_batch;
- EXPECT_OK(batch->Serialize(&trow_batch, full_dedup));
+ RETURN_IF_ERROR(batch->Serialize(&trow_batch, full_dedup));
RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get());
if (print_batches) cout << PrintBatch(&deserialized_batch) << endl;
@@ -83,6 +86,80 @@ class RowBatchSerializeTest : public testing::Test {
TestTuplesEqual(*tuple_desc, tuple, deserialized_tuple);
}
}
+ return Status::OK();
+ }
+
+ // Serializes and deserializes 'batch', then checks that the deserialized batch is valid
+ // and has the same contents as 'batch'. This requires that serialization succeed.
+ void TestRowBatch(const RowDescriptor& row_desc, RowBatch* batch, bool print_batches,
+ bool full_dedup = false) {
+ EXPECT_OK(TestRowBatchInternal(row_desc, batch, print_batches, full_dedup));
+ }
+
+ // Construct a RowBatch with the specified size by creating a single row with
+ // multiple strings, then test whether this RowBatch can be serialized and
+ // deserialized successfully. If there is an error during serialization,
+ // return that error.
+ Status TestRowBatchLimits(int64_t row_batch_size) {
+ // tuple: (int, string, string, string)
+ // This uses three strings so that this test can reach INT_MAX+1 without any
+ // single string exceeding the 1GB limit on string length (see string-value.h).
+ DescriptorTblBuilder builder(fe_.get(), &pool_);
+ builder.DeclareTuple() << TYPE_INT << TYPE_STRING << TYPE_STRING << TYPE_STRING;
+ DescriptorTbl* desc_tbl = builder.Build();
+
+ // Create row descriptor
+ vector<bool> nullable_tuples(1, false);
+ vector<TTupleId> tuple_id(1, (TTupleId) 0);
+ RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
+ EXPECT_EQ(row_desc.tuple_descriptors().size(), 1);
+
+ // Create base row
+ RowBatch* batch = pool_.Add(new RowBatch(&row_desc, 1, tracker_.get()));
+ int len = row_desc.GetRowSize();
+ uint8_t* tuple_mem = batch->tuple_data_pool()->Allocate(len);
+ memset(tuple_mem, 0, len);
+
+ // Create one row
+ TupleRow* row = batch->GetRow(batch->AddRow());
+
+ // There is only one TupleDescriptor
+ TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[0];
+ Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem);
+
+ // Write slot 0 (Random Integer)
+ SlotDescriptor* int_desc = tuple_desc->slots()[0];
+ WriteValue(tuple, *int_desc, batch->tuple_data_pool());
+
+ // The RowBatch has consumed 'len' bytes so far. Need to add row_batch_size - len
+ // bytes of string data. Split this equally among the three strings with any
+ // remainder going to the first string.
+ int64_t size_remaining = row_batch_size - len;
+ int64_t string1_size = (size_remaining / 3) + (size_remaining % 3);
+ int64_t string2_size = (size_remaining / 3);
+ int64_t string3_size = string2_size;
+
+ // Write string #1
+ SlotDescriptor* string1_desc = tuple_desc->slots()[1];
+ StringValue sv1(string(string1_size, 'a'));
+ RawValue::Write(&sv1, tuple, string1_desc, batch->tuple_data_pool());
+
+ // Write string #2
+ SlotDescriptor* string2_desc = tuple_desc->slots()[2];
+ StringValue sv2(string(string2_size, 'a'));
+ RawValue::Write(&sv2, tuple, string2_desc, batch->tuple_data_pool());
+
+ // Write string #3
+ SlotDescriptor* string3_desc = tuple_desc->slots()[3];
+ StringValue sv3(string(string3_size, 'a'));
+ RawValue::Write(&sv3, tuple, string3_desc, batch->tuple_data_pool());
+
+ // Done with this row
+ row->SetTuple(0, tuple);
+ batch->CommitLastRow();
+
+ // See if this RowBatch can be serialized and deserialized
+ return TestRowBatchInternal(row_desc, batch, false);
}
// Recursively checks that 'deserialized_tuple' is valid and has the same contents as
@@ -328,6 +405,33 @@ TEST_F(RowBatchSerializeTest, String) {
TestRowBatch(row_desc, batch, true);
}
+TEST_F(RowBatchSerializeTest, RowBatchLZ4Success) {
+ // Inputs up to LZ4_MAX_INPUT_SIZE (0x7E000000) should work
+ Status status = TestRowBatchLimits(LZ4_MAX_INPUT_SIZE);
+ cout << status.GetDetail() << endl;
+ EXPECT_OK(status);
+}
+
+TEST_F(RowBatchSerializeTest, RowBatchLZ4TooLarge) {
+ // Inputs with size LZ4_MAX_INPUT_SIZE + 1 through INT_MAX should get an error from LZ4
+ Status status;
+ status = TestRowBatchLimits(LZ4_MAX_INPUT_SIZE + 1);
+ cout << status.GetDetail() << endl;
+ EXPECT_EQ(status.code(), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+
+ status = TestRowBatchLimits(INT_MAX);
+ cout << status.GetDetail() << endl;
+ EXPECT_EQ(status.code(), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+}
+
+TEST_F(RowBatchSerializeTest, RowBatchTooLarge) {
+ // RowBatches with size > INT_MAX cannot be serialized
+ Status status;
+ status = TestRowBatchLimits(static_cast<int64_t>(INT_MAX) + 1);
+ cout << status.GetDetail() << endl;
+ EXPECT_EQ(status.code(), TErrorCode::ROW_BATCH_TOO_LARGE);
+}
+
TEST_F(RowBatchSerializeTest, BasicArray) {
// tuple: (int, string, array<int>)
ColumnType array_type;
http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index dbc12c7..87c0f2c 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -284,10 +284,10 @@ Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0));
size = TotalByteSize(&distinct_tuples);
distinct_tuples.Clear(); // Reuse allocated hash table.
- SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data);
+ RETURN_IF_ERROR(SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data));
} else {
size = TotalByteSize(nullptr);
- SerializeInternal(size, nullptr, tuple_offsets, tuple_data);
+ RETURN_IF_ERROR(SerializeInternal(size, nullptr, tuple_offsets, tuple_data));
}
*uncompressed_size = size;
*is_compressed = false;
@@ -300,7 +300,11 @@ Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
auto compressor_cleanup =
MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
+ // If the input size is too large for LZ4 to compress, MaxOutputLen() will return 0.
int64_t compressed_size = compressor.MaxOutputLen(size);
+ if (compressed_size == 0) {
+ return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size);
+ }
DCHECK_GT(compressed_size, 0);
if (compression_scratch_.size() < compressed_size) {
compression_scratch_.resize(compressed_size);
@@ -333,12 +337,15 @@ bool RowBatch::UseFullDedup() {
return false;
}
-void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
+Status RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
vector<int32_t>* tuple_offsets, string* tuple_data_str) {
DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
- // TODO: max_size() is much larger than the amount of memory we could feasibly
- // allocate. Need better way to detect problem.
- DCHECK_LE(size, tuple_data_str->max_size());
+
+ // The maximum uncompressed RowBatch size that can be serialized is INT_MAX. This
+ // is because the tuple offsets are int32s and will overflow for a larger size.
+ if (size > numeric_limits<int32_t>::max()) {
+ return Status(TErrorCode::ROW_BATCH_TOO_LARGE, size, numeric_limits<int32_t>::max());
+ }
// TODO: track memory usage
// TODO: detect if serialized size is too large to allocate and return proper error.
@@ -385,6 +392,7 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
}
}
DCHECK_EQ(offset, size);
+ return Status::OK();
}
Status RowBatch::AllocateBuffer(BufferPool::ClientHandle* client, int64_t len,
http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3bde4d1..1b3778e 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -480,7 +480,7 @@ class RowBatch {
/// enabled. The distinct_tuples map must be empty.
int64_t TotalByteSize(DedupMap* distinct_tuples);
- void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
+ Status SerializeInternal(int64_t size, DedupMap* distinct_tuples,
vector<int32_t>* tuple_offsets, string* tuple_data);
/// All members below need to be handled in RowBatch::AcquireState()
http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index bdeb1ed..e878498 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -347,7 +347,14 @@ error_codes = (
("SASL_APP_NAME_MISMATCH", 114,
"InitAuth() called multiple times with different names. Was called with $0. "
- "Now using $1.")
+ "Now using $1."),
+
+ ("PARQUET_BIT_PACKED_LEVELS", 115,
+ "Can not read Parquet file $0 with deprecated BIT_PACKED encoding for rep or "
+ "def levels. Support was removed in Impala 3.0 - see IMPALA-6077."),
+
+ ("ROW_BATCH_TOO_LARGE", 116,
+ "Row batch cannot be serialized: size of $0 bytes exceeds supported limit of $1"),
)
import sys