You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/21 23:01:35 UTC

[4/5] incubator-impala git commit: IMPALA-5250: Unify decompressor output_length semantics

IMPALA-5250: Unify decompressor output_length semantics

This patch makes the semantics of the output_length parameter in
Codec::ProcessBlock to be the same across all codecs. In existing code
different decompressor treats output_length differently:
1. SnappyDecompressor needs output_length to be greater than or equal
   to the actual decompressed length, but it does not set it to the
   actual decompressed length after decompression.
2. SnappyBlockDecompressor and Lz4Decompressor require output_length to
   be exactly the same as the actual decompressed length, otherwise
   decompression fails.
3. Other decompressors need output_length to be greater than or equal to
   the actual decompressed length and will set it to actual decompressed
   length if oversized.
This inconsistency leads to a bug where the error message is
undeterministic when the compressed block is corrupted. This patch makes
all decompressor behave like a modified version of 3:
Output_length should be greater than or equal to the actual decompressed
length and it will be set to actual decompressed length if oversized. A
decompression failure sets it to 0.
Lz4Decompressor will use the "safe" instead of the "fast" decompression
function, for the latter is insecure with corrupted data and requires
the decompressed length to be known.

Testing: A testcase is added checking that the decompressors can handle
an oversized output buffer correctly. A regression test for the exact
case described in IMPALA-5250 is also added. A benchmark is run on a
16-node cluster testing the performance impact of the LZ4Decompressor
change and no performance regression is found.

Change-Id: Ifd42942b169921a7eb53940c3762bc45bb82a993
Reviewed-on: http://gerrit.cloudera.org:8080/8030
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b1752268
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b1752268
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b1752268

Branch: refs/heads/master
Commit: b175226869853e219165a8fe3f7ab67ba7187caf
Parents: bd08ed4
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Sep 8 20:11:35 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 21 22:25:09 2017 +0000

----------------------------------------------------------------------
 be/src/util/codec.h            |  4 +--
 be/src/util/decompress-test.cc | 43 ++++++++++++++++------
 be/src/util/decompress.cc      | 72 ++++++++++++++++++++-----------------
 3 files changed, 74 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index b150b3c..d21e399 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -104,8 +104,8 @@ class Codec {
   /// transformed output). If output_preallocated is false, *output will be allocated from
   /// the codec's mempool. In this case, a mempool must have been passed into the c'tor.
   //
-  /// In either case, *output_length will be set to the actual length of the transformed
-  /// output.
+  /// If the transformation succeeds, *output_length will be set to the actual length of
+  /// the transformed output. Otherwise it will be set to 0.
   //
   /// Inputs:
   ///   input_length: length of the data to process

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 1e8760d..d170501 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -70,8 +70,6 @@ class DecompressorTest : public ::testing::Test {
           sizeof(input_), input_);
       CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
           0, NULL);
-      DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
-          sizeof(input_), input_);
     } else {
       CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
       // Test with odd-length input (to test the calculation of block-sizes in
@@ -88,10 +86,9 @@ class DecompressorTest : public ::testing::Test {
         // bzip does not allow NULL input
         CompressAndDecompress(compressor.get(), decompressor.get(), 0, input_);
       }
-      DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
-          sizeof(input_), input_);
     }
-
+    DecompressOverUnderSizedOutputBuffer(compressor.get(), decompressor.get(),
+        sizeof(input_), input_);
     compressor->Close();
     decompressor->Close();
   }
@@ -151,10 +148,11 @@ class DecompressorTest : public ::testing::Test {
     EXPECT_EQ(memcmp(input, output, input_len), 0);
   }
 
-  // Test the behavior when the decompressor is given too little space to produce
-  // the decompressed output. Verify that the decompressor returns an error and
-  // does not overflow the provided buffer.
-  void DecompressInsufficientOutputBuffer(Codec* compressor, Codec* decompressor,
+  // Test the behavior when the decompressor is given too little / too much space.
+  // Verify that the decompressor returns an error when the space is not enough, gives
+  // the correct output size when the space is enough, and does not write beyond the
+  // output size it claims.
+  void DecompressOverUnderSizedOutputBuffer(Codec* compressor, Codec* decompressor,
       int64_t input_len, uint8_t* input) {
     uint8_t* compressed;
     int64_t compressed_length;
@@ -180,9 +178,18 @@ class DecompressorTest : public ::testing::Test {
     u_int32_t *canary = (u_int32_t *) &output[output_len];
     *canary = 0x66aa77bb;
     Status status = decompressor->ProcessBlock(true, compressed_length, compressed,
-                                               &output_len, &output);
+        &output_len, &output);
     EXPECT_EQ(*canary, 0x66aa77bb);
     EXPECT_FALSE(status.ok());
+    EXPECT_EQ(output_len, 0);
+
+    // Check that the output length is the same as input when the decompressor is provided
+    // with abundant space.
+    output_len = input_len * 2;
+    output = mem_pool_.Allocate(output_len);
+    EXPECT_TRUE(decompressor->ProcessBlock(true, compressed_length, compressed,
+        &output_len, &output).ok());
+    EXPECT_EQ(output_len, input_len);
   }
 
   void Compress(Codec* compressor, int64_t input_len, uint8_t* input,
@@ -418,6 +425,22 @@ TEST_F(DecompressorTest, Impala1506) {
   pool.FreeAll();
 }
 
+TEST_F(DecompressorTest, Impala5250) {
+  // Regression test for IMPALA-5250. It tests that SnappyDecompressor handles an input
+  // buffer with a zero byte correctly. It should set the output_length to 0.
+  MemTracker trax;
+  MemPool pool(&trax);
+  scoped_ptr<Codec> decompressor;
+  EXPECT_OK(Codec::CreateDecompressor(&pool, true, impala::THdfsCompression::SNAPPY,
+      &decompressor));
+  uint8_t buf[1]{0};
+  uint8_t out_buf[1];
+  int64_t output_length = 1;
+  uint8_t* output = out_buf;
+  EXPECT_OK(decompressor->ProcessBlock(true, 1, buf, &output_length, &output));
+  EXPECT_EQ(output_length, 0);
+}
+
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 2488586..f3466b9 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -139,7 +139,9 @@ Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8
 
 Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
-  if (UNLIKELY(output_preallocated && *output_length == 0)) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
+  if (UNLIKELY(output_preallocated && output_length_local == 0)) {
     // The zlib library does not allow *output to be nullptr, even when output_length is 0
     // (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
     // early if no output is expected. Note that we don't signal an error if the input
@@ -162,7 +164,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     }
     use_temp = true;
     *output = out_buffer_;
-    *output_length = buffer_length_;
+    output_length_local = buffer_length_;
   }
 
   // Reset the stream for this block
@@ -182,7 +184,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
     stream_.avail_in = input_length;
     stream_.next_out = reinterpret_cast<Bytef*>(*output);
-    stream_.avail_out = *output_length;
+    stream_.avail_out = output_length_local;
 
     if (use_temp) {
       // We don't know the output size, so this might fail.
@@ -198,7 +200,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     if (!use_temp) {
       stringstream ss;
       ss << "Too small a buffer passed to GzipDecompressor. InputLength="
-        << input_length << " OutputLength=" << *output_length;
+        << input_length << " OutputLength=" << output_length_local;
       return Status(ss.str());
     }
 
@@ -213,7 +215,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
           nullptr, details, buffer_length_);
     }
     *output = out_buffer_;
-    *output_length = buffer_length_;
+    output_length_local = buffer_length_;
     ret = inflateReset(&stream_);
   }
 
@@ -228,7 +230,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
 
   // stream_.avail_out is the number of bytes *left* in the out buffer, but
   // we're interested in the number of bytes used.
-  *output_length = *output_length - stream_.avail_out;
+  *output_length = output_length_local - stream_.avail_out;
   if (use_temp) memory_pool_->AcquireData(temp_memory_pool_.get(), reuse_buffer_);
   return Status::OK();
 }
@@ -257,14 +259,16 @@ int64_t BzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input)
 
 Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
-  if (UNLIKELY(output_preallocated && *output_length == 0)) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
+  if (UNLIKELY(output_preallocated && output_length_local == 0)) {
     // Same problem as zlib library, see comment in GzipDecompressor::ProcessBlock().
     return Status::OK();
   }
 
   bool use_temp = false;
   if (output_preallocated) {
-    buffer_length_ = *output_length;
+    buffer_length_ = output_length_local;
     out_buffer_ = *output;
   } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
     // guess that we will need 2x the input length.
@@ -423,11 +427,14 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
 // Utility function to decompress snappy block compressed data.  If size_only is true,
 // this function does not decompress but only computes the output size and writes
 // the result to *output_len.
-// If size_only is false, output must be preallocated to output_len and this needs to
-// be exactly big enough to hold the decompressed output.
-// size_only is a O(1) operations (just reads a single varint for each snappy block).
+// If size_only is false, output buffer size must be at least *output_len. *output_len is
+// updated with the actual output size if the decompression succeeds, and is set to 0
+// otherwise.
+// size_only is an O(1) operation (just reads a single varint for each snappy block).
 static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     bool size_only, int64_t* output_len, char* output) {
+  int64_t buffer_size = *output_len;
+  *output_len = 0;
   int64_t uncompressed_total_len = 0;
   while (input_len > 0) {
     uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
@@ -435,7 +442,7 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     input_len -= sizeof(uint32_t);
 
     if (!size_only) {
-      int64_t remaining_output_size = *output_len - uncompressed_total_len;
+      int64_t remaining_output_size = buffer_size - uncompressed_total_len;
       if (remaining_output_size < uncompressed_block_len) {
         return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
       }
@@ -448,7 +455,6 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       input_len -= sizeof(uint32_t);
 
       if (compressed_len == 0 || compressed_len > input_len) {
-        *output_len = 0;
         return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
       }
 
@@ -456,14 +462,13 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       size_t uncompressed_len;
       if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
               compressed_len, &uncompressed_len)) {
-        *output_len = 0;
         return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
       }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {
         // Check output bounds
-        int64_t remaining_output_size = *output_len - uncompressed_total_len;
+        int64_t remaining_output_size = buffer_size - uncompressed_total_len;
         if (remaining_output_size < uncompressed_len) {
           return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
         }
@@ -481,23 +486,21 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       uncompressed_total_len += uncompressed_len;
     }
   }
-
-  if (size_only) {
-    *output_len = uncompressed_total_len;
-  } else if (*output_len != uncompressed_total_len) {
-    return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
-  }
+  *output_len = uncompressed_total_len;
   return Status::OK();
 }
 
 Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
     const uint8_t* input, int64_t* output_len, uint8_t** output) {
+  int64_t output_length_local = *output_len;
+  *output_len = 0;
   if (!output_preallocated) {
     // If we don't know the size beforehand, compute it.
-    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, nullptr));
-    if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < *output_len) {
+    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, &output_length_local,
+        nullptr));
+    if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < output_length_local) {
       // Need to allocate a new buffer
-      buffer_length_ = *output_len;
+      buffer_length_ = output_length_local;
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock",
@@ -510,7 +513,9 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
   }
 
   char* out_ptr = reinterpret_cast<char*>(*output);
-  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
+  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, &output_length_local,
+      out_ptr));
+  *output_len = output_length_local;
   return Status::OK();
 }
 
@@ -530,6 +535,8 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
 
 Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
   int64_t uncompressed_length = MaxOutputLen(input_length, input);
   if (uncompressed_length < 0) {
     return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
@@ -548,20 +555,18 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
       }
     }
     *output = out_buffer_;
-    *output_length = uncompressed_length;
   } else {
     // If the preallocated buffer is too small (e.g. if the file metadata is corrupt),
     // bail out early. Otherwise, this could result in a buffer overrun.
-    if (uncompressed_length > *output_length) {
+    if (uncompressed_length > output_length_local) {
       return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
     }
   }
-
   if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
           static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
     return Status("Snappy: RawUncompress failed");
   }
-
+  *output_length = uncompressed_length;
   return Status::OK();
 }
 
@@ -577,12 +582,13 @@ int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
 Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
   DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
-  // LZ4_decompress_fast will cause a segmentation fault if passed a nullptr output.
   if(*output_length == 0) return Status::OK();
-  if (LZ4_decompress_fast(reinterpret_cast<const char*>(input),
-          reinterpret_cast<char*>(*output), *output_length) != input_length) {
+  int ret = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
+      reinterpret_cast<char*>(*output), input_length, *output_length);
+  if (ret < 0) {
+    *output_length = 0;
     return Status("Lz4: uncompress failed");
   }
-
+  *output_length = ret;
   return Status::OK();
 }