You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/27 21:44:20 UTC

impala git commit: IMPALA-6543: Limit RowBatch serialization size to INT_MAX

Repository: impala
Updated Branches:
  refs/heads/2.x 34d4e8ed4 -> ecbb88da5


IMPALA-6543: Limit RowBatch serialization size to INT_MAX

The serialization format of a row batch relies on
tuple offsets. In its current form, the tuple offsets
are int32s. This means that it is impossible to generate
a valid serialization of a row batch that is larger
than INT_MAX.

This changes RowBatch::SerializeInternal() to return an
error if trying to serialize a row batch larger than INT_MAX.
This prevents a DCHECK on debug builds when creating a row
larger than 2GB.

This also changes the compression logic in RowBatch::Serialize()
to avoid a DCHECK if LZ4 will not be able to compress the
row batch. Instead, it returns an error.

This modifies row-batch-serialize-test to verify behavior at
each of the limits. Specifically:
RowBatches up to size LZ4_MAX_INPUT_SIZE succeed.
RowBatches with size range [LZ4_MAX_INPUT_SIZE+1, INT_MAX]
fail on LZ4 compression.
RowBatches with size > INT_MAX fail with RowBatch too large.

Change-Id: I1f93131facf47bd8d86c3d4923b23186ee90fcbf
Reviewed-on: http://gerrit.cloudera.org:8080/9459
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ecbb88da
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ecbb88da
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ecbb88da

Branch: refs/heads/2.x
Commit: ecbb88da5b82bec92c6dc710125d9529d4c085fa
Parents: 34d4e8e
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue Feb 20 11:38:23 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 27 19:46:58 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/row-batch-serialize-test.cc | 112 +++++++++++++++++++++++-
 be/src/runtime/row-batch.cc                |  20 +++--
 be/src/runtime/row-batch.h                 |   2 +-
 common/thrift/generate_error_codes.py      |   9 +-
 4 files changed, 131 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index 041e4fe..f6a048d 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <lz4.h>
+
 #include "common/init.h"
 #include "testutil/gtest-util.h"
 #include "runtime/collection-value.h"
@@ -60,13 +62,14 @@ class RowBatchSerializeTest : public testing::Test {
   }
 
   // Serializes and deserializes 'batch', then checks that the deserialized batch is valid
-  // and has the same contents as 'batch'.
-  void TestRowBatch(const RowDescriptor& row_desc, RowBatch* batch, bool print_batches,
-      bool full_dedup = false) {
+  // and has the same contents as 'batch'. If serialization returns an error (e.g. if the
+  // row batch is too large to serialize), this will return that error.
+  Status TestRowBatchInternal(const RowDescriptor& row_desc, RowBatch* batch,
+      bool print_batches, bool full_dedup = false) {
     if (print_batches) cout << PrintBatch(batch) << endl;
 
     TRowBatch trow_batch;
-    EXPECT_OK(batch->Serialize(&trow_batch, full_dedup));
+    RETURN_IF_ERROR(batch->Serialize(&trow_batch, full_dedup));
 
     RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get());
     if (print_batches) cout << PrintBatch(&deserialized_batch) << endl;
@@ -83,6 +86,80 @@ class RowBatchSerializeTest : public testing::Test {
         TestTuplesEqual(*tuple_desc, tuple, deserialized_tuple);
       }
     }
+    return Status::OK();
+  }
+
+  // Serializes and deserializes 'batch', then checks that the deserialized batch is valid
+  // and has the same contents as 'batch'. This requires that serialization succeed.
+  void TestRowBatch(const RowDescriptor& row_desc, RowBatch* batch, bool print_batches,
+      bool full_dedup = false) {
+    EXPECT_OK(TestRowBatchInternal(row_desc, batch, print_batches, full_dedup));
+  }
+
+  // Construct a RowBatch with the specified size by creating a single row with
+  // multiple strings, then test whether this RowBatch can be serialized and
+  // deserialized successfully. If there is an error during serialization,
+  // return that error.
+  Status TestRowBatchLimits(int64_t row_batch_size) {
+    // tuple: (int, string, string, string)
+    // This uses three strings so that this test can reach INT_MAX+1 without any
+    // single string exceeding the 1GB limit on string length (see string-value.h).
+    DescriptorTblBuilder builder(fe_.get(), &pool_);
+    builder.DeclareTuple() << TYPE_INT << TYPE_STRING << TYPE_STRING << TYPE_STRING;
+    DescriptorTbl* desc_tbl = builder.Build();
+
+    // Create row descriptor
+    vector<bool> nullable_tuples(1, false);
+    vector<TTupleId> tuple_id(1, (TTupleId) 0);
+    RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
+    EXPECT_EQ(row_desc.tuple_descriptors().size(), 1);
+
+    // Create base row
+    RowBatch* batch = pool_.Add(new RowBatch(&row_desc, 1, tracker_.get()));
+    int len = row_desc.GetRowSize();
+    uint8_t* tuple_mem = batch->tuple_data_pool()->Allocate(len);
+    memset(tuple_mem, 0, len);
+
+    // Create one row
+    TupleRow* row = batch->GetRow(batch->AddRow());
+
+    // There is only one TupleDescriptor
+    TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[0];
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem);
+
+    // Write slot 0 (Random Integer)
+    SlotDescriptor* int_desc = tuple_desc->slots()[0];
+    WriteValue(tuple, *int_desc, batch->tuple_data_pool());
+
+    // The RowBatch has consumed 'len' bytes so far. Need to add row_batch_size - len
+    // bytes of string data. Split this equally among the three strings with any
+    // remainder going to the first string.
+    int64_t size_remaining = row_batch_size - len;
+    int64_t string1_size = (size_remaining / 3) + (size_remaining % 3);
+    int64_t string2_size = (size_remaining / 3);
+    int64_t string3_size = string2_size;
+
+    // Write string #1
+    SlotDescriptor* string1_desc = tuple_desc->slots()[1];
+    StringValue sv1(string(string1_size, 'a'));
+    RawValue::Write(&sv1, tuple, string1_desc, batch->tuple_data_pool());
+
+    // Write string #2
+    SlotDescriptor* string2_desc = tuple_desc->slots()[2];
+    StringValue sv2(string(string2_size, 'a'));
+    RawValue::Write(&sv2, tuple, string2_desc, batch->tuple_data_pool());
+
+    // Write string #3
+    SlotDescriptor* string3_desc = tuple_desc->slots()[3];
+    StringValue sv3(string(string3_size, 'a'));
+    RawValue::Write(&sv3, tuple, string3_desc, batch->tuple_data_pool());
+
+    // Done with this row
+    row->SetTuple(0, tuple);
+    batch->CommitLastRow();
+
+    // See if this RowBatch can be serialized and deserialized
+    return TestRowBatchInternal(row_desc, batch, false);
   }
 
   // Recursively checks that 'deserialized_tuple' is valid and has the same contents as
@@ -328,6 +405,33 @@ TEST_F(RowBatchSerializeTest, String) {
   TestRowBatch(row_desc, batch, true);
 }
 
+TEST_F(RowBatchSerializeTest, RowBatchLZ4Success) {
+  // Inputs up to LZ4_MAX_INPUT_SIZE (0x7E000000) should work
+  Status status = TestRowBatchLimits(LZ4_MAX_INPUT_SIZE);
+  cout << status.GetDetail() << endl;
+  EXPECT_OK(status);
+}
+
+TEST_F(RowBatchSerializeTest, RowBatchLZ4TooLarge) {
+  // Inputs with size LZ4_MAX_INPUT_SIZE + 1 through INT_MAX should get an error from LZ4
+  Status status;
+  status = TestRowBatchLimits(LZ4_MAX_INPUT_SIZE + 1);
+  cout << status.GetDetail() << endl;
+  EXPECT_EQ(status.code(), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+
+  status = TestRowBatchLimits(INT_MAX);
+  cout << status.GetDetail() << endl;
+  EXPECT_EQ(status.code(), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+}
+
+TEST_F(RowBatchSerializeTest, RowBatchTooLarge) {
+  // RowBatches with size > INT_MAX cannot be serialized
+  Status status;
+  status = TestRowBatchLimits(static_cast<int64_t>(INT_MAX) + 1);
+  cout << status.GetDetail() << endl;
+  EXPECT_EQ(status.code(), TErrorCode::ROW_BATCH_TOO_LARGE);
+}
+
 TEST_F(RowBatchSerializeTest, BasicArray) {
   // tuple: (int, string, array<int>)
   ColumnType array_type;

http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index dbc12c7..87c0f2c 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -284,10 +284,10 @@ Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
     RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0));
     size = TotalByteSize(&distinct_tuples);
     distinct_tuples.Clear(); // Reuse allocated hash table.
-    SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data);
+    RETURN_IF_ERROR(SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data));
   } else {
     size = TotalByteSize(nullptr);
-    SerializeInternal(size, nullptr, tuple_offsets, tuple_data);
+    RETURN_IF_ERROR(SerializeInternal(size, nullptr, tuple_offsets, tuple_data));
   }
   *uncompressed_size = size;
   *is_compressed = false;
@@ -300,7 +300,11 @@ Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
     auto compressor_cleanup =
         MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
 
+    // If the input size is too large for LZ4 to compress, MaxOutputLen() will return 0.
     int64_t compressed_size = compressor.MaxOutputLen(size);
+    if (compressed_size == 0) {
+      return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size);
+    }
     DCHECK_GT(compressed_size, 0);
     if (compression_scratch_.size() < compressed_size) {
       compression_scratch_.resize(compressed_size);
@@ -333,12 +337,15 @@ bool RowBatch::UseFullDedup() {
   return false;
 }
 
-void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
+Status RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
     vector<int32_t>* tuple_offsets, string* tuple_data_str) {
   DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
-  // TODO: max_size() is much larger than the amount of memory we could feasibly
-  // allocate. Need better way to detect problem.
-  DCHECK_LE(size, tuple_data_str->max_size());
+
+  // The maximum uncompressed RowBatch size that can be serialized is INT_MAX. This
+  // is because the tuple offsets are int32s and will overflow for a larger size.
+  if (size > numeric_limits<int32_t>::max()) {
+    return Status(TErrorCode::ROW_BATCH_TOO_LARGE, size, numeric_limits<int32_t>::max());
+  }
 
   // TODO: track memory usage
   // TODO: detect if serialized size is too large to allocate and return proper error.
@@ -385,6 +392,7 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
     }
   }
   DCHECK_EQ(offset, size);
+  return Status::OK();
 }
 
 Status RowBatch::AllocateBuffer(BufferPool::ClientHandle* client, int64_t len,

http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3bde4d1..1b3778e 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -480,7 +480,7 @@ class RowBatch {
   /// enabled. The distinct_tuples map must be empty.
   int64_t TotalByteSize(DedupMap* distinct_tuples);
 
-  void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
+  Status SerializeInternal(int64_t size, DedupMap* distinct_tuples,
       vector<int32_t>* tuple_offsets, string* tuple_data);
 
   /// All members below need to be handled in RowBatch::AcquireState()

http://git-wip-us.apache.org/repos/asf/impala/blob/ecbb88da/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index bdeb1ed..e878498 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -347,7 +347,14 @@ error_codes = (
 
   ("SASL_APP_NAME_MISMATCH", 114,
    "InitAuth() called multiple times with different names. Was called with $0. "
-   "Now using $1.")
+   "Now using $1."),
+
+  ("PARQUET_BIT_PACKED_LEVELS", 115,
+      "Can not read Parquet file $0 with deprecated BIT_PACKED encoding for rep or "
+      "def levels. Support was removed in Impala 3.0 - see IMPALA-6077."),
+
+  ("ROW_BATCH_TOO_LARGE", 116,
+   "Row batch cannot be serialized: size of $0 bytes exceeds supported limit of $1"),
 )
 
 import sys