You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by dh...@apache.org on 2018/05/18 05:12:10 UTC

[4/4] impala git commit: IMPALA-3833: Fix invalid data handling in Sequence and RCFile scanners

IMPALA-3833: Fix invalid data handling in Sequence and RCFile scanners

Introduced new error message when scanning a corrupt Sequence or RCFile.
Added new checks to detect buffer overrun while handling Sequence or RCFile.

Testing:
  a) Made changes to fuzz test for RCFile/Sequence file, ran fuzz test in a loop
      with 200 iteration without failure.

  b) Ran exhaustive test on the changes without failure.

Change-Id: Ic9cfc38af3f30c65ada9734eb471dbfa6ecdd74a
Reviewed-on: http://gerrit.cloudera.org:8080/8936
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab75dd12
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab75dd12
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab75dd12

Branch: refs/heads/master
Commit: ab75dd12e49100f153911bef87a9dab810cf9b58
Parents: c1c122a
Author: Pranay <ps...@cloudera.com>
Authored: Wed Apr 18 18:01:56 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri May 18 05:05:57 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-rcfile-scanner.cc     | 153 ++++++++++++++++++++++------
 be/src/exec/hdfs-rcfile-scanner.h      |  16 ++-
 be/src/exec/hdfs-sequence-scanner.cc   |  79 ++++++++++----
 be/src/exec/hdfs-sequence-scanner.h    |   6 ++
 be/src/exec/read-write-util-test.cc    |  11 +-
 be/src/exec/read-write-util.h          |  32 ++++--
 be/src/exec/scanner-context.inline.h   |   5 +
 be/src/util/decompress.cc              |  10 ++
 tests/query_test/test_scanners_fuzz.py |  11 +-
 9 files changed, 255 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index a706c3d..37376c6 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -49,6 +49,9 @@ const char* const HdfsRCFileScanner::RCFILE_METADATA_KEY_NUM_COLS =
 
 const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1};
 
+// Check max column limit, set to 8 million
+const int HdfsRCFileScanner::MAX_NCOLS = 8000000;
+
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
@@ -82,6 +85,13 @@ Status HdfsRCFileScanner::InitNewRange() {
         reuse_row_group_buffer_, header_->codec, &decompressor_));
   }
 
+  int ncols = reinterpret_cast<RcFileHeader*>(header_)->num_cols;
+  if (ncols < 0 || ncols > MAX_NCOLS) {
+    stringstream ss;
+    ss << stream_->filename() << " Column limit has exceeded " << MAX_NCOLS
+       << " limit, the number of columns are " << ncols;
+    return Status(ss.str());
+  }
   // Allocate the buffers for the key information that is used to read and decode
   // the column data.
   columns_.resize(reinterpret_cast<RcFileHeader*>(header_)->num_cols);
@@ -116,7 +126,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     rc_header->version = RCF1;
   } else {
     stringstream ss;
-    ss << "Invalid RCFILE_VERSION_HEADER: '"
+    ss << stream_->filename() << " Invalid RCFILE_VERSION_HEADER: '"
        << ReadWriteUtil::HexDump(header, sizeof(RCFILE_VERSION_HEADER)) << "'";
     return Status(ss.str());
   }
@@ -130,9 +140,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     if (len != strlen(HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME) ||
         memcmp(class_name_key, HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME, len)) {
       stringstream ss;
-      ss << "Invalid RCFILE_KEY_CLASS_NAME: '"
-         << string(reinterpret_cast<char*>(class_name_key), len)
-         << "' len=" << len;
+      ss << stream_->filename() << " Invalid RCFILE_KEY_CLASS_NAME: '"
+         << string(reinterpret_cast<char*>(class_name_key), len) << "' len=" << len;
       return Status(ss.str());
     }
 
@@ -142,9 +151,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     if (len != strlen(HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME) ||
         memcmp(class_name_val, HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME, len)) {
       stringstream ss;
-      ss << "Invalid RCFILE_VALUE_CLASS_NAME: '"
-         << string(reinterpret_cast<char*>(class_name_val), len)
-         << "' len=" << len;
+      ss << stream_->filename() << " Invalid RCFILE_VALUE_CLASS_NAME: '"
+         << string(reinterpret_cast<char*>(class_name_val), len) << "' len=" << len;
       return Status(ss.str());
     }
   }
@@ -161,7 +169,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
         stream_->ReadBoolean(&is_blk_compressed, &parse_status_));
     if (is_blk_compressed) {
       stringstream ss;
-      ss << "RC files do no support block compression.";
+      ss << stream_->filename() << " RC files does not support block compression.";
       return Status(ss.str());
     }
   }
@@ -172,7 +180,11 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
     header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
     Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
-    DCHECK(it != Codec::CODEC_MAP.end());
+    if (it == Codec::CODEC_MAP.end()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid codec: " << header_->codec;
+      return Status(ss.str());
+    }
     header_->compression_type = it->second;
   } else {
     header_->compression_type = THdfsCompression::NONE;
@@ -208,10 +220,10 @@ Status HdfsRCFileScanner::ReadNumColumnsMetadata() {
       StringParser::ParseResult result;
       int num_cols =
           StringParser::StringToInt<int>(value_str.c_str(), value_str.size(), &result);
-      if (result != StringParser::PARSE_SUCCESS) {
+      if (result != StringParser::PARSE_SUCCESS || num_cols < 0) {
         stringstream ss;
-        ss << "Could not parse number of columns in file " << stream_->filename()
-           << ": " << value_str;
+        ss << " Could not parse number of columns in file " << stream_->filename()
+           << " : " << value_str;
         if (result == StringParser::PARSE_OVERFLOW) ss << " (result overflowed)";
         return Status(ss.str());
       }
@@ -271,7 +283,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad record length: " << record_length << " at offset: " << position;
+    ss << stream_->filename() << " Bad record length: " << record_length
+       << " at offset: " << position;
     return Status(ss.str());
   }
   RETURN_IF_FALSE(stream_->ReadInt(&key_length_, &parse_status_));
@@ -279,7 +292,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad key length: " << key_length_ << " at offset: " << position;
+    ss << stream_->filename() << " Bad key length: " << key_length_
+       << " at offset: " << position;
     return Status(ss.str());
   }
   RETURN_IF_FALSE(stream_->ReadInt(&compressed_key_length_, &parse_status_));
@@ -287,7 +301,7 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad compressed key length: " << compressed_key_length_
+    ss << stream_->filename() << " Bad compressed key length: " << compressed_key_length_
        << " at offset: " << position;
     return Status(ss.str());
   }
@@ -316,42 +330,98 @@ Status HdfsRCFileScanner::ReadKeyBuffers() {
     memcpy(key_buffer, buffer, key_length_);
   }
 
-  row_group_length_ = 0;
   uint8_t* key_buf_ptr = key_buffer;
-  int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_);
+  row_group_length_ = 0;
+  int remain_len = key_length_;
+  int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_, key_length_);
+  if (bytes_read == -1 || num_rows_ < 0) {
+    stringstream ss;
+    ss << stream_->filename() << " Bad row group key buffer, key length: " << key_length_;
+    return Status(ss.str());
+  }
   key_buf_ptr += bytes_read;
+  remain_len = remain_len - bytes_read;
 
+  // Track the starting position in the buffer.
+  uint8_t* start_key_buf_ptr = key_buf_ptr;
   for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
-    GetCurrentKeyBuffer(col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr);
-    DCHECK_LE(key_buf_ptr, key_buffer + key_length_);
+    if (key_buf_ptr < start_key_buf_ptr || (key_buf_ptr > key_buffer + key_length_)
+        || remain_len <= 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Bad row group key buffer, column idx: " << col_idx;
+      return Status(ss.str());
+    }
+    RETURN_IF_ERROR(GetCurrentKeyBuffer(
+        col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr, remain_len));
+    remain_len = remain_len - (key_buf_ptr - start_key_buf_ptr);
+    start_key_buf_ptr = key_buf_ptr;
   }
-  DCHECK_EQ(key_buf_ptr, key_buffer + key_length_);
 
   return Status::OK();
 }
 
-void HdfsRCFileScanner::GetCurrentKeyBuffer(int col_idx, bool skip_col_data,
-                                            uint8_t** key_buf_ptr) {
+Status HdfsRCFileScanner::BadColumnInfo(int col_idx) {
+  stringstream ss;
+  ss << stream_->filename() << " Corrupt column at index: " << col_idx;
+  return Status(ss.str());
+}
+
+Status HdfsRCFileScanner::GetCurrentKeyBuffer(
+    int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length) {
   ColumnInfo& col_info = columns_[col_idx];
+  int remain_len = buf_length;
 
-  int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len);
+  if (remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
+
+  DCHECK_GT(remain_len, 0);
+  int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  DCHECK_GT(remain_len, 0);
 
-  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len);
+  bytes_read =
+      ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  if (remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
 
   int col_key_buf_len;
-  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr , &col_key_buf_len);
+  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_key_buf_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
+
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  if (col_info.uncompressed_buffer_len < 0 || remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
 
   if (!skip_col_data) {
     col_info.key_buffer = *key_buf_ptr;
 
+    DCHECK_GE(col_info.uncompressed_buffer_len, 0);
+
     // Set the offset for the start of the data for this column in the allocated buffer.
     col_info.start_offset = row_group_length_;
     row_group_length_ += col_info.uncompressed_buffer_len;
   }
+  col_info.buf_length = col_key_buf_len;
   *key_buf_ptr += col_key_buf_len;
+  remain_len -= bytes_read;
+  DCHECK_GE(remain_len, 0);
+
+  return Status::OK();
 }
 
 inline Status HdfsRCFileScanner::NextField(int col_idx) {
@@ -366,11 +436,11 @@ inline Status HdfsRCFileScanner::NextField(int col_idx) {
     int64_t length = 0;
     uint8_t* col_key_buf = col_info.key_buffer;
     int bytes_read = ReadWriteUtil::GetVLong(
-        col_key_buf, col_info.key_buffer_pos, &length);
+        col_key_buf, col_info.key_buffer_pos, &length, col_info.buf_length);
     if (bytes_read == -1) {
         int64_t position = stream_->file_offset();
         stringstream ss;
-        ss << "Invalid column length at offset: " << position;
+        ss << stream_->filename() << " Invalid column length at offset: " << position;
         return Status(ss.str());
     }
     col_info.key_buffer_pos += bytes_read;
@@ -403,6 +473,11 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
   for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
     ColumnInfo& column = columns_[col_idx];
     if (!columns_[col_idx].materialize_column) {
+      if (column.buffer_len < 0) {
+        stringstream ss;
+        ss << stream_->filename() << " Bad column buffer len: " << column.buffer_len;
+        return Status(ss.str());
+      }
       // Not materializing this column, just skip it.
       RETURN_IF_FALSE(
           stream_->SkipBytes(column.buffer_len, &parse_status_));
@@ -411,7 +486,17 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
 
     // TODO: Stream through these column buffers instead of reading everything
     // in at once.
-    DCHECK_LE(column.uncompressed_buffer_len + column.start_offset, row_group_length_);
+    // Uncompressed buffer size for a column should not exceed the row_group_length_
+    // as row_group_length_ is a sum of uncompressed buffer length for all the columns
+    // so this check ensures that there is enough space in row_group_buffer for the
+    // uncompressed data.
+    if (column.uncompressed_buffer_len + column.start_offset > row_group_length_) {
+      stringstream ss;
+      ss << stream_->filename() << " Bad column buffer uncompressed buffer length: "
+         << column.uncompressed_buffer_len << " at offset " << column.start_offset;
+      return Status(ss.str());
+    }
+
     if (header_->is_compressed) {
       uint8_t* compressed_input;
       RETURN_IF_FALSE(stream_->ReadBytes(
@@ -493,8 +578,16 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
         const char* col_start = reinterpret_cast<const char*>(
             row_group_buffer_ + column.start_offset + column.buffer_pos);
         const int field_len = column.current_field_len;
-        DCHECK_LE(col_start + field_len,
-            reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
+        const char* row_group_end =
+            reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_);
+        const char* col_end = col_start + field_len;
+        if (col_end > row_group_end || column.start_offset < 0 || column.buffer_pos < 0
+            || col_start > row_group_end || field_len < 0) {
+          stringstream ss;
+          ss << stream_->filename()
+             << " Bad column index at offset : " << column.start_offset;
+          return Status(ss.str());
+        }
 
         if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
             false, false, row_batch->tuple_data_pool())) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index 835d2a2..f35bcf8 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -257,6 +257,9 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   /// of the file {'R', 'C', 'F' 1}
   static const uint8_t RCFILE_VERSION_HEADER[4];
 
+  // Check max column limit
+  static const int MAX_NCOLS;
+
   /// Implementation of superclass functions.
   virtual FileHeader* AllocateFileHeader();
   virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
@@ -289,12 +292,14 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   /// Input/Output:
   ///   key_buf_ptr: Pointer to the buffered file data, this will be moved
   ///                past the data for this column.
+  ///   buf_len: Length of the buffer that will be read.
   /// Sets:
   ///   col_buf_len_
   ///   col_buf_uncompressed_len_
   ///   col_key_bufs_
   ///   col_bufs_off_
-  void GetCurrentKeyBuffer(int col_idx, bool skip_col_data, uint8_t** key_buf_ptr);
+  Status GetCurrentKeyBuffer(
+      int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length);
 
   /// Read the rowgroup column buffers
   /// Sets:
@@ -323,6 +328,13 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   ///   row_pos_
   Status NextRow() WARN_UNUSED_RESULT;
 
+  /// Error message printed on a formatted stream when a bad column idex is encountered.
+  /// Input:
+  ///   col_idx: column to print.
+  /// Output:
+  ///   Error status, with formatted stream.
+  Status BadColumnInfo(int col_idx);
+
   enum Version {
     SEQ6,     // Version for sequence file and pre hive-0.9 rc files
     RCF1      // The version post hive-0.9 which uses a new header
@@ -352,6 +364,8 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
     int32_t key_buffer_len;
     /// This is a ptr into the scanner's key_buffer_ for this column.
     uint8_t* key_buffer;
+    /// Length of the key buffer
+    int32_t buf_length;
 
     /// Current position in the key buffer
     int32_t key_buffer_pos;

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..2183655 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -120,7 +120,7 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
     int64_t in_size = current_block_length_ - current_key_length_;
     if (in_size < 0) {
       stringstream ss;
-      ss << "Invalid record size: " << in_size;
+      ss << stream_->filename() << " Invalid record size: " << in_size;
       return Status(ss.str());
     }
     uint8_t* compressed_data;
@@ -136,15 +136,19 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
     }
     *record_ptr = unparsed_data_buffer_;
     // Read the length of the record.
-    int size = ReadWriteUtil::GetVLong(*record_ptr, record_len);
-    if (size == -1) return Status("Invalid record sizse");
+    int size = ReadWriteUtil::GetVLong(*record_ptr, record_len, in_size);
+    if (size == -1) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid record size: " << in_size;
+      return Status(ss.str());
+    }
     *record_ptr += size;
   } else {
     // Uncompressed records
     RETURN_IF_FALSE(stream_->ReadVLong(record_len, &parse_status_));
     if (*record_len < 0) {
       stringstream ss;
-      ss << "Invalid record length: " << *record_len;
+      ss << stream_->filename() << " Invalid record length: " << *record_len;
       return Status(ss.str());
     }
     RETURN_IF_FALSE(
@@ -199,9 +203,9 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch)
     if (sync_indicator != -1) {
       if (state_->LogHasSpace()) {
         stringstream ss;
-        ss << "Expecting sync indicator (-1) at file offset "
-            << (stream_->file_offset() - sizeof(int)) << ".  "
-            << "Sync indicator found " << sync_indicator << ".";
+        ss << stream_->filename() << " Expecting sync indicator (-1) at file offset "
+           << (stream_->file_offset() - sizeof(int)) << ".  "
+           << "Sync indicator found " << sync_indicator << ".";
         state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
       }
       return Status("Bad sync hash");
@@ -229,15 +233,34 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
   // Parse record starts and lengths
   int field_location_offset = 0;
   for (int i = 0; i < num_to_process; ++i) {
-    DCHECK_LT(i, record_locations_.size());
-    int bytes_read = ReadWriteUtil::GetVLong(
-        next_record_in_compressed_block_, &record_locations_[i].len);
+    if (i >= record_locations_.size() || record_locations_[i].len < 0
+        || next_record_in_compressed_block_ > data_buffer_end_) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
+    int bytes_read = ReadWriteUtil::GetVLong(next_record_in_compressed_block_,
+        &record_locations_[i].len, next_record_in_compressed_block_len_);
     if (UNLIKELY(bytes_read == -1)) {
-      return Status("Invalid record sizes in compressed block.");
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
     }
     next_record_in_compressed_block_ += bytes_read;
+    next_record_in_compressed_block_len_ -= bytes_read;
+    if (next_record_in_compressed_block_len_ <= 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
     record_locations_[i].record = next_record_in_compressed_block_;
     next_record_in_compressed_block_ += record_locations_[i].len;
+    next_record_in_compressed_block_len_ -= record_locations_[i].len;
+    if (next_record_in_compressed_block_len_ < 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
   }
 
   // Parse records to find field locations.
@@ -254,9 +277,17 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
           reinterpret_cast<char*>(record_locations_[i].record),
           &field_locations_[field_location_offset], &num_fields));
     }
-    DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
+    if (num_fields != scan_node_->materialized_slots().size()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
     field_location_offset += num_fields;
-    DCHECK_LE(field_location_offset, field_locations_.size());
+    if (field_location_offset > field_locations_.size()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
   }
 
   int max_added_tuples = (scan_node_->limit() == -1) ?
@@ -377,7 +408,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
 
   if (memcmp(header, SEQFILE_VERSION_HEADER, sizeof(SEQFILE_VERSION_HEADER))) {
     stringstream ss;
-    ss << "Invalid SEQFILE_VERSION_HEADER: '"
+    ss << stream_->filename() << " Invalid SEQFILE_VERSION_HEADER: '"
        << ReadWriteUtil::HexDump(header, sizeof(SEQFILE_VERSION_HEADER)) << "'";
     return Status(ss.str());
   }
@@ -390,7 +421,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
   RETURN_IF_FALSE(stream_->ReadText(&class_name, &len, &parse_status_));
   if (memcmp(class_name, HdfsSequenceScanner::SEQFILE_VALUE_CLASS_NAME, len)) {
     stringstream ss;
-    ss << "Invalid SEQFILE_VALUE_CLASS_NAME: '"
+    ss << stream_->filename() << " Invalid SEQFILE_VALUE_CLASS_NAME: '"
        << string(reinterpret_cast<char*>(class_name), len) << "'";
     return Status(ss.str());
   }
@@ -408,7 +439,9 @@ Status HdfsSequenceScanner::ReadFileHeader() {
     RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
     header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
     Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
-    DCHECK(it != Codec::CODEC_MAP.end());
+    if (it == Codec::CODEC_MAP.end()) {
+      return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, header_->codec);
+    }
     header_->compression_type = it->second;
   } else {
     header_->compression_type = THdfsCompression::NONE;
@@ -449,7 +482,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad block length: " << current_block_length_ << " at offset " << position;
+    ss << stream_->filename() << " Bad block length: " << current_block_length_
+       << " at offset " << position;
     return Status(ss.str());
   }
 
@@ -458,7 +492,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad key length: " << current_key_length_ << " at offset " << position;
+    ss << stream_->filename() << " Bad key length: " << current_key_length_
+       << " at offset " << position;
     return Status(ss.str());
   }
 
@@ -472,8 +507,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
   if (num_buffered_records < 0) {
     if (state_->LogHasSpace()) {
       stringstream ss;
-      ss << "Bad compressed block record count: "
-         << num_buffered_records;
+      ss << stream_->filename()
+         << " Bad compressed block record count: " << num_buffered_records;
       state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
     }
     return Status("bad record count");
@@ -493,7 +528,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
   // Check for a reasonable size
   if (block_size > MAX_BLOCK_SIZE || block_size < 0) {
     stringstream ss;
-    ss << "Compressed block size is: " << block_size;
+    ss << stream_->filename() << " Compressed block size is: " << block_size;
     return Status(ss.str());
   }
 
@@ -507,6 +542,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
                                                 &len, &unparsed_data_buffer_));
     VLOG_FILE << "Decompressed " << block_size << " to " << len;
     next_record_in_compressed_block_ = unparsed_data_buffer_;
+    next_record_in_compressed_block_len_ = len;
+    data_buffer_end_ = unparsed_data_buffer_ + len;
   }
   num_buffered_records_in_compressed_block_ = num_buffered_records;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..e84a5e7 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -251,10 +251,16 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// Buffer for data read from the 'stream_' directly or after decompression.
   uint8_t* unparsed_data_buffer_ = nullptr;
 
+  /// End of data buffer used to check out of bound error.
+  uint8_t* data_buffer_end_ = nullptr;
+
   /// Number of buffered records unparsed_data_buffer_ from block compressed data.
   int64_t num_buffered_records_in_compressed_block_ = 0;
 
   /// Next record from block compressed data.
+  int64_t next_record_in_compressed_block_len_ = 0;
+
+  /// Next record from block compressed data.
   uint8_t* next_record_in_compressed_block_ = nullptr;
 };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index e4448de..453a333 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -83,12 +83,19 @@ TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
 }
 
 void TestPutGetZeroCompressedLong(int64_t val) {
-  uint8_t buffer[9];
+  const int32_t BUFSZ = 9;
+  uint8_t buffer[BUFSZ];
   int64_t read_val;
   int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
-  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, BUFSZ);
   EXPECT_EQ(read_bytes, num_bytes);
   EXPECT_EQ(read_val, val);
+  // Out of bound access check, -1 should be returned because buffer size is passed
+  // as 1 byte.
+  if (read_bytes > 1) {
+    read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, 1);
+    EXPECT_EQ(read_bytes, -1);
+  }
 }
 
 TEST(ReadWriteUtil, ZeroCompressedLong) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index 84d41dd..ecbebaf 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -53,11 +53,12 @@ class ReadWriteUtil {
   template<typename T>
   static T GetInt(const uint8_t* buffer);
 
-  /// Get a variable-length Long or int value from a byte buffer.
+  /// Get a variable-length Long or int value from a byte buffer of length size. Access
+  /// beyond the buffer size will return -1.
   /// Returns the length of the long/int
   /// If the size byte is corrupted then return -1;
-  static int GetVLong(uint8_t* buf, int64_t* vlong);
-  static int GetVInt(uint8_t* buf, int32_t* vint);
+  static int GetVLong(uint8_t* buf, int64_t* vlong, int32_t size);
+  static int GetVInt(uint8_t* buf, int32_t* vint, int32_t size);
 
   /// Writes a variable-length Long or int value to a byte buffer.
   /// Returns the number of bytes written.
@@ -68,8 +69,9 @@ class ReadWriteUtil {
   static int VLongRequiredBytes(int64_t val);
 
   /// Read a variable-length Long value from a byte buffer starting at the specified
-  /// byte offset.
-  static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong);
+  /// byte offset and the buffer passed is of length size, accessing beyond the
+  /// buffer length will result in returning -1 value to the caller.
+  static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size);
 
   /// Put an Integer into a buffer in big endian order.  The buffer must be big
   /// enough.
@@ -177,22 +179,30 @@ inline void ReadWriteUtil::PutInt(uint8_t* buf, uint64_t integer) {
   memcpy(buf, &big_endian, sizeof(uint64_t));
 }
 
-inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint) {
+inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint, int32_t size) {
   int64_t vlong = 0;
-  int len = GetVLong(buf, &vlong);
+  int len = GetVLong(buf, &vlong, size);
   *vint = static_cast<int32_t>(vlong);
   return len;
 }
 
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong) {
-  return GetVLong(buf, 0, vlong);
+inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong, int32_t size) {
+  return GetVLong(buf, 0, vlong, size);
 }
 
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong) {
+inline int ReadWriteUtil::GetVLong(
+    uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size) {
+  // Buffer access out of bounds.
+  if (size == 0) return -1;
+
+  // Buffer access out of bounds.
+  if (offset > size) return -1;
   int8_t firstbyte = (int8_t) buf[0 + offset];
 
   int len = DecodeVIntSize(firstbyte);
-  if (len > MAX_VINT_LEN) return -1;
+
+  // Buffer access out of bounds.
+  if (len > MAX_VINT_LEN || len > size) return -1;
   if (len == 1) {
     *vlong = static_cast<int64_t>(firstbyte);
     return len;

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/scanner-context.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h
index f4f3bcb..19ea1f4 100644
--- a/be/src/exec/scanner-context.inline.h
+++ b/be/src/exec/scanner-context.inline.h
@@ -68,6 +68,7 @@ inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
 }
 
 inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
+  DCHECK_GE(length, 0);
   int64_t bytes_left = length;
   // Skip bytes from the boundary buffer first.
   if (boundary_buffer_bytes_left_ > 0) {
@@ -102,6 +103,10 @@ inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
 inline bool ScannerContext::Stream::SkipText(Status* status) {
   int64_t len;
   RETURN_IF_FALSE(ReadVLong(&len, status));
+  if (len < 0) {
+    *status = Status("SkipText: length is negative");
+    return false;
+  }
   RETURN_IF_FALSE(SkipBytes(len, status));
   return true;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index d92dfe2..267e8f8 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -449,6 +449,12 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     }
 
     while (uncompressed_block_len > 0) {
+      // Check that input length should not be negative.
+      if (input_len < 0) {
+        stringstream ss;
+        ss << " Corruption snappy decomp input_len " << input_len;
+        return Status(ss.str());
+      }
       // Read the length of the next snappy compressed block.
       size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
       input += sizeof(uint32_t);
@@ -464,6 +470,10 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
               compressed_len, &uncompressed_len)) {
         return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
       }
+      // Check that uncompressed length should be greater than 0.
+      if (uncompressed_len <= 0) {
+        return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+      }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 2cdb4f6..4886c4a 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -71,7 +71,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # TODO(IMPALA-6772): enable for ORC formats once a new version after release-1.4.3
     # of ORC library is released.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('avro', 'parquet') or
+        v.get_value('table_format').file_format in ('avro', 'parquet', 'rc', 'seq') or
         (v.get_value('table_format').file_format == 'text' and
           v.get_value('table_format').compression_codec in ('none', 'lzo')))
 
@@ -90,6 +90,9 @@ class TestScannersFuzzing(ImpalaTestSuite):
       if table_format.compression_codec != 'snap' or \
           table_format.compression_type != 'block':
         pytest.skip()
+    elif table_format.file_format == 'rc' or \
+      table_format.file_format == 'seq':
+        pytest.skip()
     elif table_format.file_format == 'text' and \
         table_format.compression_codec != 'none':
       # decimal_tbl is not present for these file formats
@@ -208,8 +211,10 @@ class TestScannersFuzzing(ImpalaTestSuite):
         # (IMPALA-4013).
         table_format = vector.get_value('table_format')
         if table_format.file_format != 'parquet' \
-            and not (table_format.file_format == 'text' and
-            table_format.compression_codec != 'none'):
+            and not (table_format.file_format == 'text' and \
+            table_format.compression_codec != 'none') \
+            and not table_format.file_format == 'rc' \
+            and not table_format.file_format == 'seq':
           raise
 
   def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng):