You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by dh...@apache.org on 2018/05/18 05:12:10 UTC
[4/4] impala git commit: IMPALA-3833: Fix invalid data handling in
Sequence and RCFile scanners
IMPALA-3833: Fix invalid data handling in Sequence and RCFile scanners
Introduced new error message when scanning a corrupt Sequence or RCFile.
Added new checks to detect buffer overrun while handling Sequence or RCFile.
Testing:
a) Made changes to fuzz test for RCFile/Sequence file, ran fuzz test in a loop
with 200 iteration without failure.
b) Ran exhaustive test on the changes without failure.
Change-Id: Ic9cfc38af3f30c65ada9734eb471dbfa6ecdd74a
Reviewed-on: http://gerrit.cloudera.org:8080/8936
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab75dd12
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab75dd12
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab75dd12
Branch: refs/heads/master
Commit: ab75dd12e49100f153911bef87a9dab810cf9b58
Parents: c1c122a
Author: Pranay <ps...@cloudera.com>
Authored: Wed Apr 18 18:01:56 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri May 18 05:05:57 2018 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-rcfile-scanner.cc | 153 ++++++++++++++++++++++------
be/src/exec/hdfs-rcfile-scanner.h | 16 ++-
be/src/exec/hdfs-sequence-scanner.cc | 79 ++++++++++----
be/src/exec/hdfs-sequence-scanner.h | 6 ++
be/src/exec/read-write-util-test.cc | 11 +-
be/src/exec/read-write-util.h | 32 ++++--
be/src/exec/scanner-context.inline.h | 5 +
be/src/util/decompress.cc | 10 ++
tests/query_test/test_scanners_fuzz.py | 11 +-
9 files changed, 255 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index a706c3d..37376c6 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -49,6 +49,9 @@ const char* const HdfsRCFileScanner::RCFILE_METADATA_KEY_NUM_COLS =
const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1};
+// Check max column limit, set to 8 million
+const int HdfsRCFileScanner::MAX_NCOLS = 8000000;
+
// Macro to convert between SerdeUtil errors to Status returns.
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
@@ -82,6 +85,13 @@ Status HdfsRCFileScanner::InitNewRange() {
reuse_row_group_buffer_, header_->codec, &decompressor_));
}
+ int ncols = reinterpret_cast<RcFileHeader*>(header_)->num_cols;
+ if (ncols < 0 || ncols > MAX_NCOLS) {
+ stringstream ss;
+ ss << stream_->filename() << " Column limit has exceeded " << MAX_NCOLS
+ << " limit, the number of columns are " << ncols;
+ return Status(ss.str());
+ }
// Allocate the buffers for the key information that is used to read and decode
// the column data.
columns_.resize(reinterpret_cast<RcFileHeader*>(header_)->num_cols);
@@ -116,7 +126,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
rc_header->version = RCF1;
} else {
stringstream ss;
- ss << "Invalid RCFILE_VERSION_HEADER: '"
+ ss << stream_->filename() << " Invalid RCFILE_VERSION_HEADER: '"
<< ReadWriteUtil::HexDump(header, sizeof(RCFILE_VERSION_HEADER)) << "'";
return Status(ss.str());
}
@@ -130,9 +140,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
if (len != strlen(HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME) ||
memcmp(class_name_key, HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME, len)) {
stringstream ss;
- ss << "Invalid RCFILE_KEY_CLASS_NAME: '"
- << string(reinterpret_cast<char*>(class_name_key), len)
- << "' len=" << len;
+ ss << stream_->filename() << " Invalid RCFILE_KEY_CLASS_NAME: '"
+ << string(reinterpret_cast<char*>(class_name_key), len) << "' len=" << len;
return Status(ss.str());
}
@@ -142,9 +151,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
if (len != strlen(HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME) ||
memcmp(class_name_val, HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME, len)) {
stringstream ss;
- ss << "Invalid RCFILE_VALUE_CLASS_NAME: '"
- << string(reinterpret_cast<char*>(class_name_val), len)
- << "' len=" << len;
+ ss << stream_->filename() << " Invalid RCFILE_VALUE_CLASS_NAME: '"
+ << string(reinterpret_cast<char*>(class_name_val), len) << "' len=" << len;
return Status(ss.str());
}
}
@@ -161,7 +169,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
stream_->ReadBoolean(&is_blk_compressed, &parse_status_));
if (is_blk_compressed) {
stringstream ss;
- ss << "RC files do no support block compression.";
+ ss << stream_->filename() << " RC files does not support block compression.";
return Status(ss.str());
}
}
@@ -172,7 +180,11 @@ Status HdfsRCFileScanner::ReadFileHeader() {
RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
- DCHECK(it != Codec::CODEC_MAP.end());
+ if (it == Codec::CODEC_MAP.end()) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid codec: " << header_->codec;
+ return Status(ss.str());
+ }
header_->compression_type = it->second;
} else {
header_->compression_type = THdfsCompression::NONE;
@@ -208,10 +220,10 @@ Status HdfsRCFileScanner::ReadNumColumnsMetadata() {
StringParser::ParseResult result;
int num_cols =
StringParser::StringToInt<int>(value_str.c_str(), value_str.size(), &result);
- if (result != StringParser::PARSE_SUCCESS) {
+ if (result != StringParser::PARSE_SUCCESS || num_cols < 0) {
stringstream ss;
- ss << "Could not parse number of columns in file " << stream_->filename()
- << ": " << value_str;
+ ss << " Could not parse number of columns in file " << stream_->filename()
+ << " : " << value_str;
if (result == StringParser::PARSE_OVERFLOW) ss << " (result overflowed)";
return Status(ss.str());
}
@@ -271,7 +283,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
stringstream ss;
int64_t position = stream_->file_offset();
position -= sizeof(int32_t);
- ss << "Bad record length: " << record_length << " at offset: " << position;
+ ss << stream_->filename() << " Bad record length: " << record_length
+ << " at offset: " << position;
return Status(ss.str());
}
RETURN_IF_FALSE(stream_->ReadInt(&key_length_, &parse_status_));
@@ -279,7 +292,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
stringstream ss;
int64_t position = stream_->file_offset();
position -= sizeof(int32_t);
- ss << "Bad key length: " << key_length_ << " at offset: " << position;
+ ss << stream_->filename() << " Bad key length: " << key_length_
+ << " at offset: " << position;
return Status(ss.str());
}
RETURN_IF_FALSE(stream_->ReadInt(&compressed_key_length_, &parse_status_));
@@ -287,7 +301,7 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
stringstream ss;
int64_t position = stream_->file_offset();
position -= sizeof(int32_t);
- ss << "Bad compressed key length: " << compressed_key_length_
+ ss << stream_->filename() << " Bad compressed key length: " << compressed_key_length_
<< " at offset: " << position;
return Status(ss.str());
}
@@ -316,42 +330,98 @@ Status HdfsRCFileScanner::ReadKeyBuffers() {
memcpy(key_buffer, buffer, key_length_);
}
- row_group_length_ = 0;
uint8_t* key_buf_ptr = key_buffer;
- int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_);
+ row_group_length_ = 0;
+ int remain_len = key_length_;
+ int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_, key_length_);
+ if (bytes_read == -1 || num_rows_ < 0) {
+ stringstream ss;
+ ss << stream_->filename() << " Bad row group key buffer, key length: " << key_length_;
+ return Status(ss.str());
+ }
key_buf_ptr += bytes_read;
+ remain_len = remain_len - bytes_read;
+ // Track the starting position in the buffer.
+ uint8_t* start_key_buf_ptr = key_buf_ptr;
for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
- GetCurrentKeyBuffer(col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr);
- DCHECK_LE(key_buf_ptr, key_buffer + key_length_);
+ if (key_buf_ptr < start_key_buf_ptr || (key_buf_ptr > key_buffer + key_length_)
+ || remain_len <= 0) {
+ stringstream ss;
+ ss << stream_->filename() << " Bad row group key buffer, column idx: " << col_idx;
+ return Status(ss.str());
+ }
+ RETURN_IF_ERROR(GetCurrentKeyBuffer(
+ col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr, remain_len));
+ remain_len = remain_len - (key_buf_ptr - start_key_buf_ptr);
+ start_key_buf_ptr = key_buf_ptr;
}
- DCHECK_EQ(key_buf_ptr, key_buffer + key_length_);
return Status::OK();
}
-void HdfsRCFileScanner::GetCurrentKeyBuffer(int col_idx, bool skip_col_data,
- uint8_t** key_buf_ptr) {
+Status HdfsRCFileScanner::BadColumnInfo(int col_idx) {
+ stringstream ss;
+ ss << stream_->filename() << " Corrupt column at index: " << col_idx;
+ return Status(ss.str());
+}
+
+Status HdfsRCFileScanner::GetCurrentKeyBuffer(
+ int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length) {
ColumnInfo& col_info = columns_[col_idx];
+ int remain_len = buf_length;
- int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len);
+ if (remain_len <= 0) {
+ return BadColumnInfo(col_idx);
+ }
+
+ DCHECK_GT(remain_len, 0);
+ int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len, remain_len);
+ if (bytes_read == -1) {
+ return BadColumnInfo(col_idx);
+ }
*key_buf_ptr += bytes_read;
+ remain_len -= bytes_read;
+ DCHECK_GT(remain_len, 0);
- bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len);
+ bytes_read =
+ ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len, remain_len);
+ if (bytes_read == -1) {
+ return BadColumnInfo(col_idx);
+ }
*key_buf_ptr += bytes_read;
+ remain_len -= bytes_read;
+ if (remain_len <= 0) {
+ return BadColumnInfo(col_idx);
+ }
int col_key_buf_len;
- bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr , &col_key_buf_len);
+ bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_key_buf_len, remain_len);
+ if (bytes_read == -1) {
+ return BadColumnInfo(col_idx);
+ }
+
*key_buf_ptr += bytes_read;
+ remain_len -= bytes_read;
+ if (col_info.uncompressed_buffer_len < 0 || remain_len <= 0) {
+ return BadColumnInfo(col_idx);
+ }
if (!skip_col_data) {
col_info.key_buffer = *key_buf_ptr;
+ DCHECK_GE(col_info.uncompressed_buffer_len, 0);
+
// Set the offset for the start of the data for this column in the allocated buffer.
col_info.start_offset = row_group_length_;
row_group_length_ += col_info.uncompressed_buffer_len;
}
+ col_info.buf_length = col_key_buf_len;
*key_buf_ptr += col_key_buf_len;
+ remain_len -= bytes_read;
+ DCHECK_GE(remain_len, 0);
+
+ return Status::OK();
}
inline Status HdfsRCFileScanner::NextField(int col_idx) {
@@ -366,11 +436,11 @@ inline Status HdfsRCFileScanner::NextField(int col_idx) {
int64_t length = 0;
uint8_t* col_key_buf = col_info.key_buffer;
int bytes_read = ReadWriteUtil::GetVLong(
- col_key_buf, col_info.key_buffer_pos, &length);
+ col_key_buf, col_info.key_buffer_pos, &length, col_info.buf_length);
if (bytes_read == -1) {
int64_t position = stream_->file_offset();
stringstream ss;
- ss << "Invalid column length at offset: " << position;
+ ss << stream_->filename() << " Invalid column length at offset: " << position;
return Status(ss.str());
}
col_info.key_buffer_pos += bytes_read;
@@ -403,6 +473,11 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
ColumnInfo& column = columns_[col_idx];
if (!columns_[col_idx].materialize_column) {
+ if (column.buffer_len < 0) {
+ stringstream ss;
+ ss << stream_->filename() << " Bad column buffer len: " << column.buffer_len;
+ return Status(ss.str());
+ }
// Not materializing this column, just skip it.
RETURN_IF_FALSE(
stream_->SkipBytes(column.buffer_len, &parse_status_));
@@ -411,7 +486,17 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
// TODO: Stream through these column buffers instead of reading everything
// in at once.
- DCHECK_LE(column.uncompressed_buffer_len + column.start_offset, row_group_length_);
+ // Uncompressed buffer size for a column should not exceed the row_group_length_
+ // as row_group_length_ is a sum of uncompressed buffer length for all the columns
+ // so this check ensures that there is enough space in row_group_buffer for the
+ // uncompressed data.
+ if (column.uncompressed_buffer_len + column.start_offset > row_group_length_) {
+ stringstream ss;
+ ss << stream_->filename() << " Bad column buffer uncompressed buffer length: "
+ << column.uncompressed_buffer_len << " at offset " << column.start_offset;
+ return Status(ss.str());
+ }
+
if (header_->is_compressed) {
uint8_t* compressed_input;
RETURN_IF_FALSE(stream_->ReadBytes(
@@ -493,8 +578,16 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
const char* col_start = reinterpret_cast<const char*>(
row_group_buffer_ + column.start_offset + column.buffer_pos);
const int field_len = column.current_field_len;
- DCHECK_LE(col_start + field_len,
- reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
+ const char* row_group_end =
+ reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_);
+ const char* col_end = col_start + field_len;
+ if (col_end > row_group_end || column.start_offset < 0 || column.buffer_pos < 0
+ || col_start > row_group_end || field_len < 0) {
+ stringstream ss;
+ ss << stream_->filename()
+ << " Bad column index at offset : " << column.start_offset;
+ return Status(ss.str());
+ }
if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
false, false, row_batch->tuple_data_pool())) {
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index 835d2a2..f35bcf8 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -257,6 +257,9 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
/// of the file {'R', 'C', 'F' 1}
static const uint8_t RCFILE_VERSION_HEADER[4];
+ // Check max column limit
+ static const int MAX_NCOLS;
+
/// Implementation of superclass functions.
virtual FileHeader* AllocateFileHeader();
virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
@@ -289,12 +292,14 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
/// Input/Output:
/// key_buf_ptr: Pointer to the buffered file data, this will be moved
/// past the data for this column.
+ /// buf_len: Length of the buffer that will be read.
/// Sets:
/// col_buf_len_
/// col_buf_uncompressed_len_
/// col_key_bufs_
/// col_bufs_off_
- void GetCurrentKeyBuffer(int col_idx, bool skip_col_data, uint8_t** key_buf_ptr);
+ Status GetCurrentKeyBuffer(
+ int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length);
/// Read the rowgroup column buffers
/// Sets:
@@ -323,6 +328,13 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
/// row_pos_
Status NextRow() WARN_UNUSED_RESULT;
+ /// Error message printed on a formatted stream when a bad column idex is encountered.
+ /// Input:
+ /// col_idx: column to print.
+ /// Output:
+ /// Error status, with formatted stream.
+ Status BadColumnInfo(int col_idx);
+
enum Version {
SEQ6, // Version for sequence file and pre hive-0.9 rc files
RCF1 // The version post hive-0.9 which uses a new header
@@ -352,6 +364,8 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
int32_t key_buffer_len;
/// This is a ptr into the scanner's key_buffer_ for this column.
uint8_t* key_buffer;
+ /// Length of the key buffer
+ int32_t buf_length;
/// Current position in the key buffer
int32_t key_buffer_pos;
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..2183655 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -120,7 +120,7 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
int64_t in_size = current_block_length_ - current_key_length_;
if (in_size < 0) {
stringstream ss;
- ss << "Invalid record size: " << in_size;
+ ss << stream_->filename() << " Invalid record size: " << in_size;
return Status(ss.str());
}
uint8_t* compressed_data;
@@ -136,15 +136,19 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
}
*record_ptr = unparsed_data_buffer_;
// Read the length of the record.
- int size = ReadWriteUtil::GetVLong(*record_ptr, record_len);
- if (size == -1) return Status("Invalid record sizse");
+ int size = ReadWriteUtil::GetVLong(*record_ptr, record_len, in_size);
+ if (size == -1) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid record size: " << in_size;
+ return Status(ss.str());
+ }
*record_ptr += size;
} else {
// Uncompressed records
RETURN_IF_FALSE(stream_->ReadVLong(record_len, &parse_status_));
if (*record_len < 0) {
stringstream ss;
- ss << "Invalid record length: " << *record_len;
+ ss << stream_->filename() << " Invalid record length: " << *record_len;
return Status(ss.str());
}
RETURN_IF_FALSE(
@@ -199,9 +203,9 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch)
if (sync_indicator != -1) {
if (state_->LogHasSpace()) {
stringstream ss;
- ss << "Expecting sync indicator (-1) at file offset "
- << (stream_->file_offset() - sizeof(int)) << ". "
- << "Sync indicator found " << sync_indicator << ".";
+ ss << stream_->filename() << " Expecting sync indicator (-1) at file offset "
+ << (stream_->file_offset() - sizeof(int)) << ". "
+ << "Sync indicator found " << sync_indicator << ".";
state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
}
return Status("Bad sync hash");
@@ -229,15 +233,34 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
// Parse record starts and lengths
int field_location_offset = 0;
for (int i = 0; i < num_to_process; ++i) {
- DCHECK_LT(i, record_locations_.size());
- int bytes_read = ReadWriteUtil::GetVLong(
- next_record_in_compressed_block_, &record_locations_[i].len);
+ if (i >= record_locations_.size() || record_locations_[i].len < 0
+ || next_record_in_compressed_block_ > data_buffer_end_) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
+ }
+ int bytes_read = ReadWriteUtil::GetVLong(next_record_in_compressed_block_,
+ &record_locations_[i].len, next_record_in_compressed_block_len_);
if (UNLIKELY(bytes_read == -1)) {
- return Status("Invalid record sizes in compressed block.");
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
}
next_record_in_compressed_block_ += bytes_read;
+ next_record_in_compressed_block_len_ -= bytes_read;
+ if (next_record_in_compressed_block_len_ <= 0) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
+ }
record_locations_[i].record = next_record_in_compressed_block_;
next_record_in_compressed_block_ += record_locations_[i].len;
+ next_record_in_compressed_block_len_ -= record_locations_[i].len;
+ if (next_record_in_compressed_block_len_ < 0) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
+ }
}
// Parse records to find field locations.
@@ -254,9 +277,17 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
reinterpret_cast<char*>(record_locations_[i].record),
&field_locations_[field_location_offset], &num_fields));
}
- DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
+ if (num_fields != scan_node_->materialized_slots().size()) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
+ }
field_location_offset += num_fields;
- DCHECK_LE(field_location_offset, field_locations_.size());
+ if (field_location_offset > field_locations_.size()) {
+ stringstream ss;
+ ss << stream_->filename() << " Invalid compressed block";
+ return Status(ss.str());
+ }
}
int max_added_tuples = (scan_node_->limit() == -1) ?
@@ -377,7 +408,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
if (memcmp(header, SEQFILE_VERSION_HEADER, sizeof(SEQFILE_VERSION_HEADER))) {
stringstream ss;
- ss << "Invalid SEQFILE_VERSION_HEADER: '"
+ ss << stream_->filename() << " Invalid SEQFILE_VERSION_HEADER: '"
<< ReadWriteUtil::HexDump(header, sizeof(SEQFILE_VERSION_HEADER)) << "'";
return Status(ss.str());
}
@@ -390,7 +421,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
RETURN_IF_FALSE(stream_->ReadText(&class_name, &len, &parse_status_));
if (memcmp(class_name, HdfsSequenceScanner::SEQFILE_VALUE_CLASS_NAME, len)) {
stringstream ss;
- ss << "Invalid SEQFILE_VALUE_CLASS_NAME: '"
+ ss << stream_->filename() << " Invalid SEQFILE_VALUE_CLASS_NAME: '"
<< string(reinterpret_cast<char*>(class_name), len) << "'";
return Status(ss.str());
}
@@ -408,7 +439,9 @@ Status HdfsSequenceScanner::ReadFileHeader() {
RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
- DCHECK(it != Codec::CODEC_MAP.end());
+ if (it == Codec::CODEC_MAP.end()) {
+ return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, header_->codec);
+ }
header_->compression_type = it->second;
} else {
header_->compression_type = THdfsCompression::NONE;
@@ -449,7 +482,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
stringstream ss;
int64_t position = stream_->file_offset();
position -= sizeof(int32_t);
- ss << "Bad block length: " << current_block_length_ << " at offset " << position;
+ ss << stream_->filename() << " Bad block length: " << current_block_length_
+ << " at offset " << position;
return Status(ss.str());
}
@@ -458,7 +492,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
stringstream ss;
int64_t position = stream_->file_offset();
position -= sizeof(int32_t);
- ss << "Bad key length: " << current_key_length_ << " at offset " << position;
+ ss << stream_->filename() << " Bad key length: " << current_key_length_
+ << " at offset " << position;
return Status(ss.str());
}
@@ -472,8 +507,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
if (num_buffered_records < 0) {
if (state_->LogHasSpace()) {
stringstream ss;
- ss << "Bad compressed block record count: "
- << num_buffered_records;
+ ss << stream_->filename()
+ << " Bad compressed block record count: " << num_buffered_records;
state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
}
return Status("bad record count");
@@ -493,7 +528,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
// Check for a reasonable size
if (block_size > MAX_BLOCK_SIZE || block_size < 0) {
stringstream ss;
- ss << "Compressed block size is: " << block_size;
+ ss << stream_->filename() << " Compressed block size is: " << block_size;
return Status(ss.str());
}
@@ -507,6 +542,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
&len, &unparsed_data_buffer_));
VLOG_FILE << "Decompressed " << block_size << " to " << len;
next_record_in_compressed_block_ = unparsed_data_buffer_;
+ next_record_in_compressed_block_len_ = len;
+ data_buffer_end_ = unparsed_data_buffer_ + len;
}
num_buffered_records_in_compressed_block_ = num_buffered_records;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..e84a5e7 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -251,10 +251,16 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
/// Buffer for data read from the 'stream_' directly or after decompression.
uint8_t* unparsed_data_buffer_ = nullptr;
+ /// End of data buffer used to check out of bound error.
+ uint8_t* data_buffer_end_ = nullptr;
+
/// Number of buffered records unparsed_data_buffer_ from block compressed data.
int64_t num_buffered_records_in_compressed_block_ = 0;
/// Next record from block compressed data.
+ int64_t next_record_in_compressed_block_len_ = 0;
+
+ /// Next record from block compressed data.
uint8_t* next_record_in_compressed_block_ = nullptr;
};
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index e4448de..453a333 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -83,12 +83,19 @@ TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
}
void TestPutGetZeroCompressedLong(int64_t val) {
- uint8_t buffer[9];
+ const int32_t BUFSZ = 9;
+ uint8_t buffer[BUFSZ];
int64_t read_val;
int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
- int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+ int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, BUFSZ);
EXPECT_EQ(read_bytes, num_bytes);
EXPECT_EQ(read_val, val);
+ // Out of bound access check, -1 should be returned because buffer size is passed
+ // as 1 byte.
+ if (read_bytes > 1) {
+ read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, 1);
+ EXPECT_EQ(read_bytes, -1);
+ }
}
TEST(ReadWriteUtil, ZeroCompressedLong) {
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index 84d41dd..ecbebaf 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -53,11 +53,12 @@ class ReadWriteUtil {
template<typename T>
static T GetInt(const uint8_t* buffer);
- /// Get a variable-length Long or int value from a byte buffer.
+ /// Get a variable-length Long or int value from a byte buffer of length size. Access
+ /// beyond the buffer size will return -1.
/// Returns the length of the long/int
/// If the size byte is corrupted then return -1;
- static int GetVLong(uint8_t* buf, int64_t* vlong);
- static int GetVInt(uint8_t* buf, int32_t* vint);
+ static int GetVLong(uint8_t* buf, int64_t* vlong, int32_t size);
+ static int GetVInt(uint8_t* buf, int32_t* vint, int32_t size);
/// Writes a variable-length Long or int value to a byte buffer.
/// Returns the number of bytes written.
@@ -68,8 +69,9 @@ class ReadWriteUtil {
static int VLongRequiredBytes(int64_t val);
/// Read a variable-length Long value from a byte buffer starting at the specified
- /// byte offset.
- static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong);
+ /// byte offset and the buffer passed is of length size, accessing beyond the
+ /// buffer length will result in returning -1 value to the caller.
+ static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size);
/// Put an Integer into a buffer in big endian order. The buffer must be big
/// enough.
@@ -177,22 +179,30 @@ inline void ReadWriteUtil::PutInt(uint8_t* buf, uint64_t integer) {
memcpy(buf, &big_endian, sizeof(uint64_t));
}
-inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint) {
+inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint, int32_t size) {
int64_t vlong = 0;
- int len = GetVLong(buf, &vlong);
+ int len = GetVLong(buf, &vlong, size);
*vint = static_cast<int32_t>(vlong);
return len;
}
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong) {
- return GetVLong(buf, 0, vlong);
+inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong, int32_t size) {
+ return GetVLong(buf, 0, vlong, size);
}
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong) {
+inline int ReadWriteUtil::GetVLong(
+ uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size) {
+ // Buffer access out of bounds.
+ if (size == 0) return -1;
+
+ // Buffer access out of bounds.
+ if (offset > size) return -1;
int8_t firstbyte = (int8_t) buf[0 + offset];
int len = DecodeVIntSize(firstbyte);
- if (len > MAX_VINT_LEN) return -1;
+
+ // Buffer access out of bounds.
+ if (len > MAX_VINT_LEN || len > size) return -1;
if (len == 1) {
*vlong = static_cast<int64_t>(firstbyte);
return len;
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/exec/scanner-context.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h
index f4f3bcb..19ea1f4 100644
--- a/be/src/exec/scanner-context.inline.h
+++ b/be/src/exec/scanner-context.inline.h
@@ -68,6 +68,7 @@ inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
}
inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
+ DCHECK_GE(length, 0);
int64_t bytes_left = length;
// Skip bytes from the boundary buffer first.
if (boundary_buffer_bytes_left_ > 0) {
@@ -102,6 +103,10 @@ inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
inline bool ScannerContext::Stream::SkipText(Status* status) {
int64_t len;
RETURN_IF_FALSE(ReadVLong(&len, status));
+ if (len < 0) {
+ *status = Status("SkipText: length is negative");
+ return false;
+ }
RETURN_IF_FALSE(SkipBytes(len, status));
return true;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index d92dfe2..267e8f8 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -449,6 +449,12 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
}
while (uncompressed_block_len > 0) {
+ // Check that input length should not be negative.
+ if (input_len < 0) {
+ stringstream ss;
+ ss << " Corruption snappy decomp input_len " << input_len;
+ return Status(ss.str());
+ }
// Read the length of the next snappy compressed block.
size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
input += sizeof(uint32_t);
@@ -464,6 +470,10 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
compressed_len, &uncompressed_len)) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
}
+ // Check that uncompressed length should be greater than 0.
+ if (uncompressed_len <= 0) {
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+ }
DCHECK_GT(uncompressed_len, 0);
if (!size_only) {
http://git-wip-us.apache.org/repos/asf/impala/blob/ab75dd12/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 2cdb4f6..4886c4a 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -71,7 +71,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
# TODO(IMPALA-6772): enable for ORC formats once a new version after release-1.4.3
# of ORC library is released.
cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').file_format in ('avro', 'parquet') or
+ v.get_value('table_format').file_format in ('avro', 'parquet', 'rc', 'seq') or
(v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec in ('none', 'lzo')))
@@ -90,6 +90,9 @@ class TestScannersFuzzing(ImpalaTestSuite):
if table_format.compression_codec != 'snap' or \
table_format.compression_type != 'block':
pytest.skip()
+ elif table_format.file_format == 'rc' or \
+ table_format.file_format == 'seq':
+ pytest.skip()
elif table_format.file_format == 'text' and \
table_format.compression_codec != 'none':
# decimal_tbl is not present for these file formats
@@ -208,8 +211,10 @@ class TestScannersFuzzing(ImpalaTestSuite):
# (IMPALA-4013).
table_format = vector.get_value('table_format')
if table_format.file_format != 'parquet' \
- and not (table_format.file_format == 'text' and
- table_format.compression_codec != 'none'):
+ and not (table_format.file_format == 'text' and \
+ table_format.compression_codec != 'none') \
+ and not table_format.file_format == 'rc' \
+ and not table_format.file_format == 'seq':
raise
def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng):