You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/05 20:37:39 UTC
[7/8] incubator-impala git commit: Revert "IMPALA-1619: Support
64-bit allocations."
Revert "IMPALA-1619: Support 64-bit allocations."
This reverts commit 1ffb2bd5a2a2faaa759ebdbaf49bf00aa8f86b5e.
Unbreak the packaging builds for now.
Change-Id: Id079acb83d35b51ba4dfe1c8042e1c5ec891d807
Reviewed-on: http://gerrit.cloudera.org:8080/3543
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Michael Ho <kw...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a07fc367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a07fc367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a07fc367
Branch: refs/heads/master
Commit: a07fc367eee92f6f51365b60883ba3fc2fdabd90
Parents: d197516
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jun 30 12:27:38 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 5 13:37:26 2016 -0700
----------------------------------------------------------------------
be/src/exec/delimited-text-parser.cc | 37 ++++----
be/src/exec/delimited-text-parser.h | 40 ++++----
be/src/exec/delimited-text-parser.inline.h | 55 +++++------
be/src/exec/hdfs-scanner.cc | 2 +-
be/src/exec/hdfs-scanner.h | 3 -
be/src/exec/hdfs-sequence-scanner.cc | 8 +-
be/src/exec/hdfs-text-scanner.cc | 54 ++++++-----
be/src/exec/hdfs-text-scanner.h | 6 +-
be/src/exec/scanner-context.cc | 16 +++-
be/src/runtime/buffered-block-mgr.cc | 21 ++--
be/src/runtime/collection-value-builder.h | 2 +-
be/src/runtime/free-pool-test.cc | 27 +-----
be/src/runtime/free-pool.h | 19 ++--
be/src/runtime/mem-pool-test.cc | 49 +++-------
be/src/runtime/mem-pool.h | 8 +-
be/src/runtime/string-buffer-test.cc | 20 ++--
be/src/runtime/string-buffer.h | 116 +++++++++++++----------
be/src/udf/udf-internal.h | 4 +-
be/src/udf/udf.cc | 4 +-
be/src/udf/udf.h | 2 -
be/src/util/bit-util.h | 22 ++---
be/src/util/codec.cc | 12 ++-
be/src/util/codec.h | 5 +
be/src/util/decompress-test.cc | 4 +-
be/src/util/decompress.cc | 67 ++++++++++---
common/thrift/generate_error_codes.py | 3 -
infra/python/bootstrap_virtualenv.py | 10 +-
infra/python/deps/requirements.txt | 1 -
tests/query_test/test_compressed_formats.py | 84 +++++++---------
29 files changed, 338 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 1a785ac..950eae4 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -116,11 +116,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
if (process_escapes_) {
- RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr,
- row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
+ ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
+ field_locations, num_tuples, num_fields, next_column_start);
} else {
- RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr,
- row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
+ ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
+ field_locations, num_tuples, num_fields, next_column_start);
}
}
@@ -155,10 +155,9 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
// If the row ended in \r\n then move to the \n
++*next_column_start;
} else {
- RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations));
- Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
- DCHECK(status.ok());
+ AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations);
+ FillColumns<false>(0, NULL, num_fields, field_locations);
column_idx_ = num_partition_keys_;
row_end_locations[*num_tuples] = *byte_buffer_ptr;
++(*num_tuples);
@@ -172,8 +171,8 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
return Status::OK();
}
} else if (new_col) {
- RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations));
+ AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations);
}
--remaining_len;
@@ -184,10 +183,9 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
// e.g. Sequence files.
if (tuple_delim_ == '\0') {
DCHECK_EQ(remaining_len, 0);
- RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations));
- Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
- DCHECK(status.ok());
+ AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations);
+ FillColumns<false>(0, NULL, num_fields, field_locations);
column_idx_ = num_partition_keys_;
++(*num_tuples);
unfinished_tuple_ = false;
@@ -195,10 +193,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
return Status::OK();
}
-// Find the first instance of the tuple delimiter. This will find the start of the first
-// full tuple in buffer by looking for the end of the previous tuple.
-int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
- int64_t tuple_start = 0;
+// Find the first instance of the tuple delimiter. This will
+// find the start of the first full tuple in buffer by looking for the end of
+// the previous tuple.
+int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) {
+ int tuple_start = 0;
const char* buffer_start = buffer;
bool found = false;
@@ -257,7 +256,7 @@ restart:
// tuple break that are all escape characters, but that is
// unlikely.
int num_escape_chars = 0;
- int64_t before_tuple_end = tuple_start - 2;
+ int before_tuple_end = tuple_start - 2;
// TODO: If scan range is split between escape character and tuple delimiter,
// before_tuple_end will be -1. Need to scan previous range for escape characters
// in this case.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index d754c74..f3e108c 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -32,9 +32,9 @@ class DelimitedTextParser {
/// collection_item_delim: delimits collection items
/// escape_char: escape delimiters, make them part of the data.
//
- /// 'num_cols' is the total number of columns including partition keys.
+ /// num_cols is the total number of columns including partition keys.
//
- /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
+ /// is_materialized_col should be initialized to an array of length 'num_cols', with
/// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
/// Owned by caller.
//
@@ -73,8 +73,6 @@ class DelimitedTextParser {
/// num_fields: Number of materialized fields parsed
/// next_column_start: pointer within file_buffer_ where the next field starts
/// after the return from the call to ParseData
- /// Returns an error status if any column exceeds the size limit.
- /// See AddColumn() for details.
Status ParseFieldLocations(int max_tuples, int64_t remaining_len,
char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
@@ -86,10 +84,9 @@ class DelimitedTextParser {
/// col.
/// - *num_fields returns the number of fields processed.
/// This function is used to parse sequence file records which do not need to
- /// parse for tuple delimiters. Returns an error status if any column exceeds the
- /// size limit. See AddColumn() for details.
+ /// parse for tuple delimiters.
template <bool process_escapes>
- Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
+ void ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
int* num_fields);
/// FindFirstInstance returns the position after the first non-escaped tuple
@@ -97,7 +94,7 @@ class DelimitedTextParser {
/// Used to find the start of a tuple if jumping into the middle of a text file.
/// Also used to find the sync marker for Sequenced and RC files.
/// If no tuple delimiter is found within the buffer, return -1;
- int64_t FindFirstInstance(const char* buffer, int64_t len);
+ int FindFirstInstance(const char* buffer, int len);
/// Will we return the current column to the query?
/// Hive allows cols at the end of the table that are not in the schema. We'll
@@ -107,18 +104,17 @@ class DelimitedTextParser {
}
/// Fill in columns missing at the end of the tuple.
- /// 'len' and 'last_column' may contain the length and the pointer to the
+ /// len and last_column may contain the length and the pointer to the
/// last column on which the file ended without a delimiter.
/// Fills in the offsets and lengths in field_locations.
- /// If parsing stopped on a delimiter and there is no last column then length will be 0.
+ /// If parsing stopped on a delimiter and there is no last column then len will be 0.
/// Other columns beyond that are filled with 0 length fields.
- /// 'num_fields' points to an initialized count of fields and will incremented
+ /// num_fields points to an initialized count of fields and will incremented
/// by the number fields added.
- /// 'field_locations' will be updated with the start and length of the fields.
- /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
+ /// field_locations will be updated with the start and length of the fields.
template <bool process_escapes>
- Status FillColumns(int64_t len, char** last_column, int* num_fields,
- impala::FieldLocation* field_locations);
+ void FillColumns(int len, char** last_column,
+ int* num_fields, impala::FieldLocation* field_locations);
/// Return true if we have not seen a tuple delimiter for the current tuple being
/// parsed (i.e., the last byte read was not a tuple delimiter).
@@ -132,28 +128,24 @@ class DelimitedTextParser {
/// Template parameter:
/// process_escapes -- if true the the column may have escape characters
/// and the negative of the len will be stored.
- /// len: length of the current column. The length of a column must fit in a 32-bit
- /// signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
- /// it will be treated as an error.
+ /// len: lenght of the current column.
/// Input/Output:
/// next_column_start: Start of the current column, moved to the start of the next.
/// num_fields: current number of fields processed, updated to next field.
/// Output:
/// field_locations: updated with start and length of current field.
- /// Return an error status if 'len' exceeds the size limit specified above.
template <bool process_escapes>
- Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
- FieldLocation* field_locations);
+ void AddColumn(int len, char** next_column_start, int* num_fields,
+ FieldLocation* field_locations);
/// Helper routine to parse delimited text using SSE instructions.
/// Identical arguments as ParseFieldLocations.
/// If the template argument, 'process_escapes' is true, this function will handle
/// escapes, otherwise, it will assume the text is unescaped. By using templates,
/// we can special case the un-escaped path for better performance. The unescaped
- /// path is optimized away by the compiler. Returns an error status if the length
- /// of any column exceeds the size limit. See AddColumn() for details.
+ /// path is optimized away by the compiler.
template <bool process_escapes>
- Status ParseSse(int max_tuples, int64_t* remaining_len,
+ void ParseSse(int max_tuples, int64_t* remaining_len,
char** byte_buffer_ptr, char** row_end_locations_,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index 28b5b4b..e96a763 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -26,7 +26,7 @@ namespace impala {
/// If the character at n is an escape character, then delimiters(tuple/field/escape
/// characters) at n+1 don't count.
inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
- uint16_t* delim_mask) {
+ uint16_t* delim_mask) {
// Escape characters can escape escape characters.
bool first_char_is_escape = *last_char_is_escape;
bool escape_next = first_char_is_escape;
@@ -39,7 +39,7 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
// Remember last character for the next iteration
*last_char_is_escape = escape_mask &
- SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
+ SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
// Shift escape mask up one so they match at the same bit index as the tuple and
// field mask (instead of being the character before) and set the correct first bit
@@ -50,41 +50,35 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
}
template <bool process_escapes>
-inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
+inline void DelimitedTextParser::AddColumn(int len, char** next_column_start,
int* num_fields, FieldLocation* field_locations) {
- if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
- return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
- }
if (ReturnCurrentColumn()) {
// Found a column that needs to be parsed, write the start/len to 'field_locations'
field_locations[*num_fields].start = *next_column_start;
- int64_t field_len = len;
if (process_escapes && current_column_has_escape_) {
- field_len = -len;
+ field_locations[*num_fields].len = -len;
+ } else {
+ field_locations[*num_fields].len = len;
}
- field_locations[*num_fields].len = static_cast<int32_t>(field_len);
++(*num_fields);
}
if (process_escapes) current_column_has_escape_ = false;
*next_column_start += len + 1;
++column_idx_;
- return Status::OK();
}
template <bool process_escapes>
-inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
+void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
int* num_fields, FieldLocation* field_locations) {
// Fill in any columns missing from the end of the tuple.
char* dummy = NULL;
if (last_column == NULL) last_column = &dummy;
while (column_idx_ < num_cols_) {
- RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
- num_fields, field_locations));
+ AddColumn<process_escapes>(len, last_column, num_fields, field_locations);
// The rest of the columns will be null.
last_column = &dummy;
len = 0;
}
- return Status::OK();
}
/// SSE optimized raw text file parsing. SSE4_2 added an instruction (with 3 modes) for
@@ -101,9 +95,10 @@ inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
/// Haystack = 'asdfghjklhjbdwwc' (the raw string)
/// Result = '1010000000011001'
template <bool process_escapes>
-inline Status DelimitedTextParser::ParseSse(int max_tuples,
+inline void DelimitedTextParser::ParseSse(int max_tuples,
int64_t* remaining_len, char** byte_buffer_ptr,
- char** row_end_locations, FieldLocation* field_locations,
+ char** row_end_locations,
+ FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start) {
DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
@@ -177,8 +172,8 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
char* delim_ptr = *byte_buffer_ptr + n;
if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
- RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
- next_column_start, num_fields, field_locations));
+ AddColumn<process_escapes>(delim_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations);
continue;
}
@@ -190,10 +185,9 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
last_row_delim_offset_ = -1;
continue;
}
- RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
- next_column_start, num_fields, field_locations));
- Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
- DCHECK(status.ok());
+ AddColumn<process_escapes>(delim_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations);
+ FillColumns<false>(0, NULL, num_fields, field_locations);
column_idx_ = num_partition_keys_;
row_end_locations[*num_tuples] = delim_ptr;
++(*num_tuples);
@@ -206,7 +200,7 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
// If the last character we processed was \r then set the offset to 0
// so that we will use it at the beginning of the next batch.
if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
- return Status::OK();
+ return;
}
}
}
@@ -220,12 +214,11 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
*remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
*byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
}
- return Status::OK();
}
/// Simplified version of ParseSSE which does not handle tuple delimiters.
template <bool process_escapes>
-inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
FieldLocation* field_locations, int* num_fields) {
char* next_column_start = buffer;
__m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
@@ -270,8 +263,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
// clear current bit
delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
- RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
- &next_column_start, num_fields, field_locations));
+ AddColumn<process_escapes>(buffer + n - next_column_start,
+ &next_column_start, num_fields, field_locations);
}
if (process_escapes) {
@@ -295,8 +288,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
if (!last_char_is_escape_ &&
(*buffer == field_delim_ || *buffer == collection_item_delim_)) {
- RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
- &next_column_start, num_fields, field_locations));
+ AddColumn<process_escapes>(buffer - next_column_start,
+ &next_column_start, num_fields, field_locations);
}
--remaining_len;
@@ -305,8 +298,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
// Last column does not have a delimiter after it. Add that column and also
// pad with empty cols if the input is ragged.
- return FillColumns<process_escapes>(buffer - next_column_start,
- &next_column_start, num_fields, field_locations);
+ FillColumns<process_escapes>(buffer - next_column_start,
+ &next_column_start, num_fields, field_locations);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 275956c..78d4994 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -186,7 +186,7 @@ Status HdfsScanner::CommitRows(int num_rows) {
DCHECK(batch_ != NULL);
DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
batch_->CommitRows(num_rows);
- tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
+ tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows;
// We need to pass the row batch to the scan node if there is too much memory attached,
// which can happen if the query is very selective. We need to release memory even
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 9069451..7f804f1 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -58,9 +58,6 @@ struct FieldLocation {
char* start;
/// Encodes the length and whether or not this fields needs to be unescaped.
/// If len < 0, then the field needs to be unescaped.
- ///
- /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue
- /// and StringVal. All other types shouldn't be anywhere near this limit.
int len;
static const char* LLVM_CLASS_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 0cd000f..1460b1a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -229,15 +229,15 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
for (int i = 0; i < num_to_process; ++i) {
int num_fields = 0;
if (delimited_text_parser_->escape_char() == '\0') {
- RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<false>(
+ delimited_text_parser_->ParseSingleTuple<false>(
record_locations_[i].len,
reinterpret_cast<char*>(record_locations_[i].record),
- &field_locations_[field_location_offset], &num_fields));
+ &field_locations_[field_location_offset], &num_fields);
} else {
- RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<true>(
+ delimited_text_parser_->ParseSingleTuple<true>(
record_locations_[i].len,
reinterpret_cast<char*>(record_locations_[i].record),
- &field_locations_[field_location_offset], &num_fields));
+ &field_locations_[field_location_offset], &num_fields);
}
DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
field_location_offset += num_fields;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6cc308d..6e501cc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -260,8 +260,8 @@ Status HdfsTextScanner::FinishScanRange() {
// tuple.
DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
DCHECK(partial_tuple_empty_);
- DCHECK(boundary_column_.IsEmpty());
- DCHECK(boundary_row_.IsEmpty());
+ DCHECK(boundary_column_.Empty());
+ DCHECK(boundary_row_.Empty());
return Status::OK();
}
@@ -286,24 +286,24 @@ Status HdfsTextScanner::FinishScanRange() {
ss << "Read failed while trying to finish scan range: " << stream_->filename()
<< ":" << stream_->file_offset() << endl << status.GetDetail();
RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
- } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
- !boundary_row_.IsEmpty() ||
+ } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
+ !boundary_row_.Empty() ||
(delimited_text_parser_->HasUnfinishedTuple() &&
(!scan_node_->materialized_slots().empty() ||
scan_node_->num_materialized_partition_keys() > 0))) {
// Missing columns or row delimiter at end of the file is ok, fill the row in.
- char* col = boundary_column_.buffer();
+ char* col = boundary_column_.str().ptr;
int num_fields = 0;
- RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
- &col, &num_fields, &field_locations_[0]));
+ delimited_text_parser_->FillColumns<true>(boundary_column_.Size(),
+ &col, &num_fields, &field_locations_[0]);
MemPool* pool;
TupleRow* tuple_row_mem;
int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
DCHECK_GE(max_tuples, 1);
// Set variables for proper error outputting on boundary tuple
- batch_start_ptr_ = boundary_row_.buffer();
- row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len();
+ batch_start_ptr_ = boundary_row_.str().ptr;
+ row_end_locations_[0] = batch_start_ptr_ + boundary_row_.str().len;
int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
DCHECK_LE(num_tuples, 1);
DCHECK_GE(num_tuples, 0);
@@ -371,7 +371,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
(num_fields > 0 || *num_tuples > 0)) {
// There can be one partial tuple which returned no more fields from this buffer.
DCHECK_LE(*num_tuples, num_fields + 1);
- if (!boundary_column_.IsEmpty()) {
+ if (!boundary_column_.Empty()) {
RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
boundary_column_.Clear();
}
@@ -423,11 +423,11 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
Status status;
if (num_bytes > 0) {
stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
- &byte_buffer_read_size_, &status);
+ &byte_buffer_read_size_, &status);
} else {
DCHECK_EQ(num_bytes, 0);
status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
- &byte_buffer_read_size_);
+ &byte_buffer_read_size_);
}
RETURN_IF_ERROR(status);
*eosr = stream_->eosr();
@@ -555,7 +555,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
}
// Need to read the entire file.
- if (file_size > byte_buffer_read_size_) {
+ if (file_size < byte_buffer_read_size_) {
stringstream ss;
ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
<< "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
@@ -600,8 +600,8 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
delimited_text_parser_->ParserReset();
SCOPED_TIMER(parse_delimiter_timer_);
- int64_t next_tuple_offset = 0;
- int64_t bytes_left = byte_buffer_read_size_;
+ int next_tuple_offset = 0;
+ int bytes_left = byte_buffer_read_size_;
while (num_skipped_rows < num_rows_to_skip) {
next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
bytes_left);
@@ -610,6 +610,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
bytes_left -= next_tuple_offset;
++num_skipped_rows;
}
+
if (next_tuple_offset != -1) *tuple_found = true;
} while (!*tuple_found && !eosr);
@@ -680,7 +681,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
// codegen'd using the IRBuilder for the specific tuple description. This function
// is then injected into the cross-compiled driving function, WriteAlignedTuples().
Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
- const vector<ExprContext*>& conjunct_ctxs) {
+ const vector<ExprContext*>& conjunct_ctxs) {
if (!node->runtime_state()->codegen_enabled()) return NULL;
LlvmCodeGen* codegen;
if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
@@ -716,9 +717,9 @@ void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) {
row_start = row_end_locations_[row_idx - 1] + 1;
}
- if (!boundary_row_.IsEmpty()) {
- // Log the beginning of the line from the previous file buffer(s).
- *ss << string(boundary_row_.buffer(), boundary_row_.len());
+ if (!boundary_row_.Empty()) {
+ // Log the beginning of the line from the previous file buffer(s)
+ *ss << boundary_row_.str();
}
// Log the erroneous line (or the suffix of a line if !boundary_line.empty()).
*ss << string(row_start, row_end - row_start);
@@ -791,7 +792,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
// Write complete tuples. The current field, if any, is at the start of a tuple.
if (num_tuples > 0) {
int max_added_tuples = (scan_node_->limit() == -1) ?
- num_tuples : scan_node_->limit() - scan_node_->rows_returned();
+ num_tuples : scan_node_->limit() - scan_node_->rows_returned();
int tuples_returned = 0;
// Call jitted function if possible
if (write_tuples_fn_ != NULL) {
@@ -835,29 +836,31 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
bool needs_escape = data->len < 0;
int copy_len = needs_escape ? -data->len : data->len;
- int64_t total_len = copy_len + boundary_column_.len();
+ int total_len = copy_len + boundary_column_.Size();
char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
if (UNLIKELY(str_data == NULL)) {
string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
"$0 bytes.", total_len);
return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
}
- memcpy(str_data, boundary_column_.buffer(), boundary_column_.len());
- memcpy(str_data + boundary_column_.len(), data->start, copy_len);
+ memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
+ memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
data->start = str_data;
data->len = needs_escape ? -total_len : total_len;
return Status::OK();
}
-void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
+int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
int num_fields, bool copy_strings) {
+ int next_line_offset = 0;
for (int i = 0; i < num_fields; ++i) {
- bool need_escape = false;
+ int need_escape = false;
int len = fields[i].len;
if (len < 0) {
len = -len;
need_escape = true;
}
+ next_line_offset += (len + 1);
const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
if (!text_converter_->WriteSlot(desc, partial_tuple_,
@@ -867,4 +870,5 @@ void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
}
++slot_idx_;
}
+ return next_line_offset;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index dae104d..997637d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -156,9 +156,9 @@ class HdfsTextScanner : public HdfsScanner {
int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples);
/// Utility function to write out 'num_fields' to 'tuple_'. This is used to parse
- /// partial tuples. If copy_strings is true, strings from fields will be copied into
- /// the boundary pool.
- void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
+ /// partial tuples. Returns bytes processed. If copy_strings is true, strings
+ /// from fields will be copied into the boundary pool.
+ int WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
/// Appends the current file and line to the RuntimeState's error log.
/// row_idx is 0-based (in current batch) where the parse error occured.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 6a5081d..3c78d88 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -234,9 +234,15 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
boundary_buffer_->Clear();
}
}
-
- // Resize the buffer to the right size.
- RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
+ // Workaround IMPALA-1619. Fail the request if requested_len is more than 1GB.
+ // StringBuffer can only handle 32-bit allocations and StringBuffer::Append()
+ // will allocate twice the current buffer size, cause int overflow.
+ // TODO: Revert once IMPALA-1619 is fixed.
+ if (UNLIKELY(requested_len > StringValue::MAX_LENGTH)) {
+ LOG(WARNING) << "Requested buffer size " << requested_len << "B > 1GB."
+ << GetStackTrace();
+ return Status(Substitute("Requested buffer size $0B > 1GB", requested_len));
+ }
while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
// We need to fetch more bytes. Copy the end of the current buffer and fetch the next
@@ -267,8 +273,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
} else {
RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
boundary_buffer_bytes_left_ += num_bytes;
- boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
- boundary_buffer_->len() - boundary_buffer_bytes_left_;
+ boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
+ boundary_buffer_->Size() - boundary_buffer_bytes_left_;
io_buffer_bytes_left_ -= num_bytes;
io_buffer_pos_ += num_bytes;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index a9e95cd..8265ea2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -303,14 +303,22 @@ bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers)
}
bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
- int64_t buffers_needed = BitUtil::Ceil(size, max_block_size());
- if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) {
- VLOG_QUERY << "Trying to consume " << size << " which is out of range.";
+ // Workaround IMPALA-1619. Return immediately if the allocation size will cause
+ // an arithmetic overflow.
+ if (UNLIKELY(size >= (1LL << 31))) {
+ // IMPALA-3238: don't repeatedly log warning when bumping up against this limit for
+ // large hash tables.
+ if (!client->logged_large_allocation_warning_) {
+ LOG(WARNING) << "Trying to allocate memory >=2GB (" << size << ")B."
+ << GetStackTrace();
+ client->logged_large_allocation_warning_ = true;
+ }
return false;
}
+ int buffers_needed = BitUtil::Ceil(size, max_block_size());
DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
-
unique_lock<mutex> lock(lock_);
+
if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
// For small allocations (less than a block size), just let the allocation through.
client->tracker_->ConsumeLocal(size, client->query_tracker_);
@@ -585,7 +593,7 @@ int BufferedBlockMgr::num_pinned_buffers(Client* client) const {
}
int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const {
- return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
+ return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
}
MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
@@ -1009,8 +1017,7 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
// 1. In the unpinned list. The buffer will not be in the free list.
// 2. in_write_ == true. The buffer will not be in the free list.
// 3. The buffer is free, but hasn't yet been reassigned to a different block.
- DCHECK_EQ(block->buffer_desc_->len, max_block_size())
- << "Non-I/O blocks are always pinned";
+ DCHECK_EQ(block->buffer_desc_->len, max_block_size()) << "Non-I/O blocks are always pinned";
DCHECK(unpinned_blocks_.Contains(block) ||
block->in_write_ ||
free_io_buffers_.Contains(block->buffer_desc_));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index 4065b80..c57b546 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -32,7 +32,7 @@ class CollectionValueBuilder {
CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc,
MemPool* pool, RuntimeState* state,
- int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
+ int initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
: coll_value_(coll_value),
tuple_desc_(tuple_desc),
pool_(pool),
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/free-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool-test.cc b/be/src/runtime/free-pool-test.cc
index c8ff93d..02d245a 100644
--- a/be/src/runtime/free-pool-test.cc
+++ b/be/src/runtime/free-pool-test.cc
@@ -85,22 +85,6 @@ TEST(FreePoolTest, Basic) {
EXPECT_EQ(mem_pool.total_allocated_bytes(), 64);
mem_pool.FreeAll();
-
- // Try making allocations larger than 1GB.
- uint8_t* p5 = pool.Allocate(1LL << 32);
- EXPECT_TRUE(p5 != NULL);
- for (int64_t i = 0; i < (1LL << 32); i += (1 << 29)) {
- *(p5 + i) = i;
- }
- EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
-
- // Test zero-byte allocation.
- p5 = pool.Allocate(0);
- EXPECT_TRUE(p5 != NULL);
- EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
- pool.Free(p5);
-
- mem_pool.FreeAll();
}
// In this test we make two allocations at increasing sizes and then we
@@ -112,13 +96,13 @@ TEST(FreePoolTest, Loop) {
MemPool mem_pool(&tracker);
FreePool pool(&mem_pool);
- map<int64_t, pair<uint8_t*, uint8_t*> > primed_allocations;
- vector<int64_t> allocation_sizes;
+ map<int, pair<uint8_t*, uint8_t*> > primed_allocations;
+ vector<int> allocation_sizes;
int64_t expected_pool_size = 0;
// Pick a non-power of 2 to exercise more code.
- for (int64_t size = 5; size < 6LL * 1024 * 1024 * 1024; size *= 5) {
+ for (int size = 3; size < 1024 * 1024 * 1024; size *= 3) {
uint8_t* p1 = pool.Allocate(size);
uint8_t* p2 = pool.Allocate(size);
EXPECT_TRUE(p1 != NULL);
@@ -179,11 +163,6 @@ TEST(FreePoolTest, ReAlloc) {
ptr = pool.Allocate(600);
EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8);
- // Try allocation larger than 1GB.
- uint8_t* ptr4 = pool.Reallocate(ptr3, 1LL << 32);
- EXPECT_TRUE(ptr3 != ptr4);
- EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8 + (1LL << 32) + 8);
-
mem_pool.FreeAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/free-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h
index 90df749..dfabaf0 100644
--- a/be/src/runtime/free-pool.h
+++ b/be/src/runtime/free-pool.h
@@ -52,9 +52,8 @@ class FreePool {
memset(&lists_, 0, sizeof(lists_));
}
- /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes.
- uint8_t* Allocate(int64_t size) {
- DCHECK_GE(size, 0);
+ /// Allocates a buffer of size.
+ uint8_t* Allocate(int size) {
#ifndef NDEBUG
static int32_t alloc_counts = 0;
if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -65,15 +64,16 @@ class FreePool {
++net_allocations_;
if (FLAGS_disable_mem_pools) return reinterpret_cast<uint8_t*>(malloc(size));
- /// Return a non-NULL dummy pointer. NULL is reserved for failures.
- if (UNLIKELY(size == 0)) return mem_pool_->EmptyAllocPtr();
+ /// This is the typical malloc behavior. NULL is reserved for failures.
+ if (size == 0) return reinterpret_cast<uint8_t*>(0x1);
int free_list_idx = Bits::Log2Ceiling64(size);
DCHECK_LT(free_list_idx, NUM_LISTS);
+
FreeListNode* allocation = lists_[free_list_idx].next;
if (allocation == NULL) {
// There wasn't an existing allocation of the right size, allocate a new one.
- size = 1LL << free_list_idx;
+ size = 1 << free_list_idx;
allocation = reinterpret_cast<FreeListNode*>(
mem_pool_->Allocate(size + sizeof(FreeListNode)));
if (UNLIKELY(allocation == NULL)) {
@@ -97,7 +97,7 @@ class FreePool {
free(ptr);
return;
}
- if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return;
+ if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return;
FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
FreeListNode* list = node->list;
#ifndef NDEBUG
@@ -114,7 +114,7 @@ class FreePool {
///
/// NULL will be returned on allocation failure. It's the caller's responsibility to
/// free the memory buffer pointed to by "ptr" in this case.
- uint8_t* Reallocate(uint8_t* ptr, int64_t size) {
+ uint8_t* Reallocate(uint8_t* ptr, int size) {
#ifndef NDEBUG
static int32_t alloc_counts = 0;
if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -125,14 +125,13 @@ class FreePool {
if (FLAGS_disable_mem_pools) {
return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size));
}
- if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size);
+ if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return Allocate(size);
FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
FreeListNode* list = node->list;
#ifndef NDEBUG
CheckValidAllocation(list, ptr);
#endif
int bucket_idx = (list - &lists_[0]);
- DCHECK_LT(bucket_idx, NUM_LISTS);
// This is the actual size of ptr.
int allocation_size = 1 << bucket_idx;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 121098c..ab6bd5a 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -21,9 +21,6 @@
#include "common/names.h"
-// Maximum allocation size which exceeds 32-bit.
-#define LARGE_ALLOC_SIZE (1LL << 32)
-
namespace impala {
// Utility class to call private functions on MemPool.
@@ -128,11 +125,6 @@ TEST(MemPoolTest, Basic) {
p2.FreeAll();
p3.FreeAll();
}
-
- // Test zero byte allocation.
- uint8_t* ptr = p.Allocate(0);
- EXPECT_TRUE(ptr != NULL);
- EXPECT_EQ(0, p.GetTotalChunkSizes());
}
// Test that we can keep an allocated chunk and a free chunk.
@@ -200,22 +192,6 @@ TEST(MemPoolTest, ReturnPartial) {
EXPECT_EQ(2, ptr[i]);
}
- // Try ReturnPartialAllocations() with 64-bit values.
- uint8_t* ptr4 = p.Allocate(LARGE_ALLOC_SIZE + 512);
- EXPECT_EQ(1024 + LARGE_ALLOC_SIZE + 512, p.total_allocated_bytes());
- memset(ptr4, 3, 512 * 2);
- p.ReturnPartialAllocation(LARGE_ALLOC_SIZE);
- uint8_t* ptr5 = p.Allocate(512);
- EXPECT_TRUE(ptr5 == ptr4 + 512);
- memset(ptr5, 4, 512);
-
- for (int i = 0; i < 512; ++i) {
- EXPECT_EQ(3, ptr4[i]);
- }
- for (int i = 512; i < 512 * 2; ++i) {
- EXPECT_EQ(4, ptr4[i]);
- }
-
p.FreeAll();
}
@@ -276,50 +252,51 @@ TEST(MemPoolTest, Limits) {
ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
// Try To allocate 20 bytes, this should succeed. TryAllocate() should leave the
- // pool in a functional state.
+ // pool in a functional state..
result = p2->TryAllocate(20);
ASSERT_TRUE(result != NULL);
ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
+
p2->FreeAll();
delete p2;
}
TEST(MemPoolTest, MaxAllocation) {
- int64_t int_max_rounded = BitUtil::RoundUp(LARGE_ALLOC_SIZE, 8);
+ int64_t int_max_rounded = BitUtil::RoundUp(INT_MAX, 8);
- // Allocate a single LARGE_ALLOC_SIZE chunk
+ // Allocate a single INT_MAX chunk
MemTracker tracker;
MemPool p1(&tracker);
- uint8_t* ptr = p1.Allocate(LARGE_ALLOC_SIZE);
+ uint8_t* ptr = p1.Allocate(INT_MAX);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded, p1.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded, p1.total_allocated_bytes());
p1.FreeAll();
- // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an LARGE_ALLOC_SIZE chunk
+ // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an INT_MAX chunk
MemPool p2(&tracker);
p2.Allocate(8);
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, p2.GetTotalChunkSizes());
EXPECT_EQ(8, p2.total_allocated_bytes());
- ptr = p2.Allocate(LARGE_ALLOC_SIZE);
+ ptr = p2.Allocate(INT_MAX);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE + int_max_rounded,
p2.GetTotalChunkSizes());
EXPECT_EQ(8LL + int_max_rounded, p2.total_allocated_bytes());
p2.FreeAll();
- // Allocate three LARGE_ALLOC_SIZE chunks followed by a small chunk
- // followed by another LARGE_ALLOC_SIZE chunk.
+ // Allocate three INT_MAX chunks followed by a small chunk followed by another INT_MAX
+ // chunk
MemPool p3(&tracker);
- p3.Allocate(LARGE_ALLOC_SIZE);
+ p3.Allocate(INT_MAX);
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+ ptr = p3.Allocate(INT_MAX);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 2, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 2, p3.total_allocated_bytes());
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+ ptr = p3.Allocate(INT_MAX);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 3, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 3, p3.total_allocated_bytes());
@@ -331,7 +308,7 @@ TEST(MemPoolTest, MaxAllocation) {
EXPECT_EQ(int_max_rounded * 3 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 3 + 8, p3.total_allocated_bytes());
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+ ptr = p3.Allocate(INT_MAX);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 4 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 4 + 8, p3.total_allocated_bytes());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index e1c4d2b..15325c0 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -101,7 +101,7 @@ class MemPool {
/// Returns 'byte_size' to the current chunk back to the mem pool. This can
/// only be used to return either all or part of the previous allocation returned
/// by Allocate().
- void ReturnPartialAllocation(int64_t byte_size) {
+ void ReturnPartialAllocation(int byte_size) {
DCHECK_GE(byte_size, 0);
DCHECK(current_chunk_idx_ != -1);
ChunkInfo& info = chunks_[current_chunk_idx_];
@@ -110,11 +110,6 @@ class MemPool {
total_allocated_bytes_ -= byte_size;
}
- /// Return a dummy pointer for zero-length allocations.
- static uint8_t* EmptyAllocPtr() {
- return reinterpret_cast<uint8_t*>(&zero_length_region_);
- }
-
/// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
void Clear();
@@ -213,7 +208,6 @@ class MemPool {
template <bool CHECK_LIMIT_FIRST>
uint8_t* Allocate(int64_t size) noexcept {
- DCHECK_GE(size, 0);
if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t *>(&zero_length_region_);
int64_t num_bytes = BitUtil::RoundUp(size, 8);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index 2387d60..95d2e41 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -24,10 +24,10 @@
namespace impala {
void ValidateString(const string& std_str, const StringBuffer& str) {
- EXPECT_EQ(std_str.empty(), str.IsEmpty());
- EXPECT_EQ(static_cast<int64_t>(std_str.size()), str.len());
+ EXPECT_EQ(std_str.empty(), str.Empty());
+ EXPECT_EQ((int)std_str.size(), str.Size());
if (std_str.size() > 0) {
- EXPECT_EQ(strncmp(std_str.c_str(), str.buffer(), std_str.size()), 0);
+ EXPECT_EQ(strncmp(std_str.c_str(), str.str().ptr, std_str.size()), 0);
}
}
@@ -55,6 +55,11 @@ TEST(StringBufferTest, Basic) {
str.Append("World", strlen("World"));
ValidateString(std_str, str);
+ // Assign
+ std_str.assign("foo");
+ str.Assign("foo", strlen("foo"));
+ ValidateString(std_str, str);
+
// Clear
std_str.clear();
str.Clear();
@@ -67,22 +72,23 @@ TEST(StringBufferTest, Basic) {
}
TEST(StringBufferTest, AppendBoundary) {
- // Test StringBuffer::Append() works beyond 1GB.
+ // Test StringBuffer::Append() up to 1GB is ok
+ // TODO: Once IMPALA-1619 is fixed, we should change the test to verify
+ // append over 2GB string is supported.
MemTracker tracker;
MemPool pool(&tracker);
StringBuffer str(&pool);
string std_str;
const int64_t chunk_size = 8 * 1024 * 1024;
- const int64_t max_data_size = 1LL << 32;
std_str.resize(chunk_size, 'a');
int64_t data_size = 0;
- while (data_size + chunk_size <= max_data_size) {
+ while (data_size + chunk_size <= StringValue::MAX_LENGTH) {
str.Append(std_str.c_str(), chunk_size);
data_size += chunk_size;
}
EXPECT_EQ(str.buffer_size(), data_size);
- std_str.resize(max_data_size, 'a');
+ std_str.resize(StringValue::MAX_LENGTH, 'a');
ValidateString(std_str, str);
pool.FreeAll();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 3725181..ec59806 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -27,87 +27,99 @@ namespace impala {
/// Dynamic-sizable string (similar to std::string) but without as many
/// copies and allocations.
-/// StringBuffer is a buffer of char allocated from 'pool'. Current usage and size of the
-/// buffer are tracked in 'len_' and 'buffer_size_' respectively. It supports a subset of
-/// the std::string functionality but will only allocate bigger string buffers as
-/// necessary. std::string tries to be immutable and will reallocate very often.
-/// std::string should be avoided in all hot paths.
+/// StringBuffer wraps a StringValue object with a pool and memory buffer length.
+/// It supports a subset of the std::string functionality but will only allocate
+/// bigger string buffers as necessary. std::string tries to be immutable and will
+/// reallocate very often. std::string should be avoided in all hot paths.
class StringBuffer {
public:
/// C'tor for StringBuffer. Memory backing the string will be allocated from
/// the pool as necessary. Can optionally be initialized from a StringValue.
StringBuffer(MemPool* pool, StringValue* str = NULL)
- : pool_(pool), buffer_(NULL), len_(0), buffer_size_(0) {
+ : pool_(pool), buffer_size_(0) {
DCHECK(pool_ != NULL);
if (str != NULL) {
- buffer_ = str->ptr;
- len_ = buffer_size_ = str->len;
+ string_value_ = *str;
+ buffer_size_ = str->len;
}
}
/// Append 'str' to the current string, allocating a new buffer as necessary.
- Status Append(const char* str, int64_t str_len) {
- int64_t new_len = len_ + str_len;
+ /// Return error status if memory limit is exceeded.
+ Status Append(const char* str, int len) {
+ int new_len = len + string_value_.len;
if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
- memcpy(buffer_ + len_, str, str_len);
- len_ += str_len;
+ memcpy(string_value_.ptr + string_value_.len, str, len);
+ string_value_.len = new_len;
return Status::OK();
}
- /// Wrapper around append() for input type 'uint8_t'.
- Status Append(const uint8_t* str, int64_t str_len) {
- return Append(reinterpret_cast<const char*>(str), str_len);
+ /// TODO: switch everything to uint8_t?
+ Status Append(const uint8_t* str, int len) {
+ return Append(reinterpret_cast<const char*>(str), len);
}
- /// Clear the underlying StringValue. The allocated buffer can be reused.
- void Clear() { len_ = 0; }
+ /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
+ Status Assign(const char* str, int len) {
+ Clear();
+ return Append(str, len);
+ }
+
+ /// Clear the underlying StringValue. The allocated buffer can be reused.
+ void Clear() {
+ string_value_.len = 0;
+ }
- /// Reset the usage and size of the buffer. Note that the allocated buffer is
- /// retained but cannot be reused.
+ /// Clears the underlying buffer and StringValue
void Reset() {
- len_ = 0;
+ string_value_.len = 0;
buffer_size_ = 0;
- buffer_ = NULL;
}
- /// Returns true if no byte is consumed in the buffer.
- bool IsEmpty() const { return len_ == 0; }
-
- /// Grows the buffer to be at least 'new_size', copying over the previous data
- /// into the new buffer. The old buffer is not freed. Return an error status if
- /// growing the buffer will exceed memory limit.
- Status GrowBuffer(int64_t new_size) {
- if (LIKELY(new_size > buffer_size_)) {
- int64_t old_size = buffer_size_;
- buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);
- char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
- if (UNLIKELY(new_buffer == NULL)) {
- string details = Substitute("StringBuffer failed to grow buffer from $0 "
- "to $1 bytes.", old_size, buffer_size_);
- return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
- }
- if (LIKELY(len_ > 0)) memcpy(new_buffer, buffer_, len_);
- buffer_ = new_buffer;
- }
- return Status::OK();
+ /// Returns whether the current string is empty
+ bool Empty() const {
+ return string_value_.len == 0;
}
- /// Returns the number of bytes consumed in the buffer.
- int64_t len() const { return len_; }
+ /// Returns the length of the current string
+ int Size() const {
+ return string_value_.len;
+ }
- /// Returns the pointer to the buffer. Note that it's the caller's responsibility
- /// to not retain the pointer to 'buffer_' across call to Append() as the buffer_
- /// may be relocated in Append().
- char* buffer() const { return buffer_; }
+ /// Returns the underlying StringValue
+ const StringValue& str() const {
+ return string_value_;
+ }
- /// Returns the size of the buffer.
- int64_t buffer_size() const { return buffer_size_; }
+ /// Returns the buffer size
+ int buffer_size() const {
+ return buffer_size_;
+ }
private:
+ /// Grows the buffer backing the string to be at least new_size, copying over the
+ /// previous string data into the new buffer. Return error status if memory limit
+ /// is exceeded.
+ Status GrowBuffer(int new_len) {
+ // TODO: Release/reuse old buffers somehow
+ buffer_size_ = std::max(buffer_size_ * 2, new_len);
+ DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
+ char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+ if (UNLIKELY(new_buffer == NULL)) {
+ string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
+ buffer_size_);
+ return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+ }
+ if (LIKELY(string_value_.len > 0)) {
+ memcpy(new_buffer, string_value_.ptr, string_value_.len);
+ }
+ string_value_.ptr = new_buffer;
+ return Status::OK();
+ }
+
MemPool* pool_;
- char* buffer_;
- int64_t len_; // number of bytes consumed in the buffer.
- int64_t buffer_size_; // size of the buffer.
+ StringValue string_value_;
+ int buffer_size_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 1996838..599e42f 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -79,7 +79,7 @@ class FunctionContextImpl {
/// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear.
/// For UDFs, the allocations can be freed at the row level.
/// TODO: free them at the batch level and save some copies?
- uint8_t* AllocateLocal(int64_t byte_size) noexcept;
+ uint8_t* AllocateLocal(int byte_size) noexcept;
/// Frees all allocations returned by AllocateLocal().
void FreeLocalAllocations() noexcept;
@@ -121,7 +121,7 @@ class FunctionContextImpl {
/// if necessary.
///
/// Return false if 'buf' is null; returns true otherwise.
- bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size);
+ bool CheckAllocResult(const char* fn_name, uint8_t* buf, int byte_size);
/// Preallocated buffer for storing varargs (if the function has any). Allocated and
/// owned by this object, but populated by an Expr function.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index ff81c9c..4cadb63 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -267,7 +267,7 @@ const char* FunctionContext::error_msg() const {
}
inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
- uint8_t* buf, int64_t byte_size) {
+ uint8_t* buf, int byte_size) {
if (UNLIKELY(buf == NULL)) {
stringstream ss;
ss << string(fn_name) << "() failed to allocate " << byte_size << " bytes.";
@@ -416,7 +416,7 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) {
}
}
-uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept {
+uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) noexcept {
assert(!closed_);
if (byte_size == 0) return NULL;
uint8_t* buffer = pool_->Allocate(byte_size);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 210c855..1b1cdab 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -140,7 +140,6 @@ class FunctionContext {
/// The UDF/UDA is responsible for calling Free() on all buffers returned by Allocate().
/// If Allocate() fails or causes the memory limit to be exceeded, the error will be
/// set in this object causing the query to fail.
- /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
uint8_t* Allocate(int byte_size) noexcept;
/// Wrapper around Allocate() to allocate a buffer of the given type "T".
@@ -156,7 +155,6 @@ class FunctionContext {
/// memory limit to be exceeded, the error will be set in this object.
///
/// This should be used for buffers that constantly get appended to.
- /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
uint8_t* Reallocate(uint8_t* ptr, int byte_size) noexcept;
/// Frees a buffer returned from Allocate() or Reallocate()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 47592b8..cae8212 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -23,7 +23,6 @@
#endif
#include <boost/type_traits/make_unsigned.hpp>
-#include <limits>
#include "common/compiler-util.h"
#include "util/cpu-info.h"
@@ -204,36 +203,31 @@ class BitUtil {
static inline uint16_t FromBigEndian(uint16_t val) { return val; }
#endif
- /// Returns true if 'value' is a non-negative 32-bit integer.
- static inline bool IsNonNegative32Bit(int64_t value) {
- return (uint64_t)value <= std::numeric_limits<int32_t>::max();
- }
-
- /// Logical right shift for signed integer types
- /// This is needed because the C >> operator does arithmetic right shift
- /// Negative shift amounts lead to undefined behavior
+ // Logical right shift for signed integer types
+ // This is needed because the C >> operator does arithmetic right shift
+ // Negative shift amounts lead to undefined behavior
template<typename T>
static T ShiftRightLogical(T v, int shift) {
// Conversion to unsigned ensures most significant bits always filled with 0's
return static_cast<typename make_unsigned<T>::type>(v) >> shift;
}
- /// Get an specific bit of a numeric type
+ // Get an specific bit of a numeric type
template<typename T>
static inline int8_t GetBit(T v, int bitpos) {
T masked = v & (static_cast<T>(0x1) << bitpos);
return static_cast<int8_t>(ShiftRightLogical(masked, bitpos));
}
- /// Set a specific bit to 1
- /// Behavior when bitpos is negative is undefined
+ // Set a specific bit to 1
+ // Behavior when bitpos is negative is undefined
template<typename T>
static T SetBit(T v, int bitpos) {
return v | (static_cast<T>(0x1) << bitpos);
}
- /// Set a specific bit to 0
- /// Behavior when bitpos is negative is undefined
+ // Set a specific bit to 0
+ // Behavior when bitpos is negative is undefined
template<typename T>
static T UnsetBit(T v, int bitpos) {
return v & ~(static_cast<T>(0x1) << bitpos);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 2ee7415..6863d1f 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -182,13 +182,15 @@ Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
const uint8_t* input, int* output_length, uint8_t** output) {
int64_t input_len64 = input_length;
int64_t output_len64 = *output_length;
- RETURN_IF_ERROR(
- ProcessBlock(output_preallocated, input_len64, input, &output_len64, output));
- // Buffer size should be between [0, (2^31 - 1)] bytes.
- if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) {
+ RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64,
+ output));
+ // Check whether we are going to have an overflow if we are going to cast from int64_t
+ // to int.
+ // TODO: Is there a faster way to do this check?
+ if (UNLIKELY(output_len64 > numeric_limits<int>::max())) {
return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
output_len64));;
}
- *output_length = static_cast<int>(output_len64);
+ *output_length = static_cast<int32_t>(output_len64);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 563a3b9..a337983 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -150,6 +150,11 @@ class Codec {
bool supports_streaming() const { return supports_streaming_; }
+ /// Largest block we will compress/decompress: 2GB.
+ /// We are dealing with compressed blocks that are never this big but we want to guard
+ /// against a corrupt file that has the block length as some large number.
+ static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1;
+
protected:
/// Create a compression operator
/// Inputs:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 7a94db0..3a32756 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -121,8 +121,10 @@ class DecompressorTest : public ::testing::Test {
EXPECT_GE(max_compressed_length, 0);
uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
compressed_length = max_compressed_length;
+
+
EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
- &compressed));
+ &compressed));
}
output_len = decompressor->MaxOutputLen(compressed_length, compressed);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 900f891..8a2ea74 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -148,6 +148,9 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
if (!reuse_buffer_ || out_buffer_ == NULL) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
+ if (buffer_length_ > MAX_BLOCK_SIZE) {
+ return Status("Decompressor: block size is too big");
+ }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -201,6 +204,11 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
// User didn't supply the buffer, double the buffer and try again.
temp_memory_pool_->Clear();
buffer_length_ *= 2;
+ if (buffer_length_ > MAX_BLOCK_SIZE) {
+ stringstream ss;
+ ss << "GzipDecompressor: block size is too big: " << buffer_length_;
+ return Status(ss.str());
+ }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -265,6 +273,9 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
} else if (!reuse_buffer_ || out_buffer_ == NULL) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
+ if (buffer_length_ > MAX_BLOCK_SIZE) {
+ return Status("Decompressor: block size is too big");
+ }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -284,6 +295,9 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
DCHECK(!output_preallocated);
temp_memory_pool_->Clear();
buffer_length_ = buffer_length_ * 2;
+ if (buffer_length_ > MAX_BLOCK_SIZE) {
+ return Status("Decompressor: block size is too big");
+ }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -401,18 +415,17 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
}
// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs
-// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split
-// into blocks. Each block "contains the uncompressed length for the block followed
-// by one of more length-prefixed blocks of compressed data."
+// the input is split into blocks. Each block "contains the uncompressed length for
+// the block followed by one of more length-prefixed blocks of compressed data."
// This is essentially blocks of blocks.
// The outer block consists of:
-// - 4 byte big endian uncompressed_size
+// - 4 byte little endian uncompressed_size
// < inner blocks >
// ... repeated until input_len is consumed ..
// The inner blocks have:
-// - 4-byte big endian compressed_size
+// - 4-byte little endian compressed_size
// < snappy compressed block >
-// - 4-byte big endian compressed_size
+// - 4-byte little endian compressed_size
// < snappy compressed block >
// ... repeated until uncompressed_size from outer block is consumed ...
@@ -430,6 +443,15 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
+ if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) {
+ if (uncompressed_total_len == 0) {
+ // TODO: is this check really robust?
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
+ uncompressed_block_len);
+ }
+ break;
+ }
+
if (!size_only) {
int64_t remaining_output_size = *output_len - uncompressed_total_len;
DCHECK_GE(remaining_output_size, uncompressed_block_len);
@@ -442,23 +464,29 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
input_len -= sizeof(uint32_t);
if (compressed_len == 0 || compressed_len > input_len) {
- *output_len = 0;
- return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
+ if (uncompressed_total_len == 0) {
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
+ }
+ input_len = 0;
+ break;
}
// Read how big the output will be.
size_t uncompressed_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
- compressed_len, &uncompressed_len)) {
- *output_len = 0;
- return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+ input_len, &uncompressed_len)) {
+ if (uncompressed_total_len == 0) {
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+ }
+ input_len = 0;
+ break;
}
DCHECK_GT(uncompressed_len, 0);
if (!size_only) {
// Decompress this snappy block
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
- compressed_len, output)) {
+ compressed_len, output)) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
}
output += uncompressed_len;
@@ -498,6 +526,14 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
*output = out_buffer_;
}
+ if (*output_len > MAX_BLOCK_SIZE) {
+ // TODO: is this check really robust?
+ stringstream ss;
+ ss << "Decompressor: block size is too big. Data is likely corrupt. "
+ << "Size: " << *output_len;
+ return Status(ss.str());
+ }
+
char* out_ptr = reinterpret_cast<char*>(*output);
RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
return Status::OK();
@@ -511,7 +547,7 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
DCHECK(input != NULL);
size_t result;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
- input_len, &result)) {
+ input_len, &result)) {
return -1;
}
return result;
@@ -524,6 +560,9 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed");
if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
buffer_length_ = uncompressed_length;
+ if (buffer_length_ > MAX_BLOCK_SIZE) {
+ return Status("Decompressor: block size is too big");
+ }
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
@@ -537,7 +576,7 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
- static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
+ static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
return Status("Snappy: RawUncompress failed");
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 26ea78d..7815233 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -274,9 +274,6 @@ error_codes = (
("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
"data of type $1: $2"),
-
- ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
- "supported length of 2147483647 bytes.")
)
import sys
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/infra/python/bootstrap_virtualenv.py
----------------------------------------------------------------------
diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py
index eb53ba2..a9c6264 100644
--- a/infra/python/bootstrap_virtualenv.py
+++ b/infra/python/bootstrap_virtualenv.py
@@ -123,16 +123,8 @@ def detect_python_cmd():
def install_deps():
- toolchain_dir = os.environ.get("IMPALA_TOOLCHAIN", "")
- snappy_version = os.environ.get("IMPALA_SNAPPY_VERSION", "")
- snappy_dir = toolchain_dir + "/snappy-" + snappy_version
- lib_path = snappy_dir + "/lib:" + os.environ.get("LD_LIBRARY_PATH", "")
- include_dir = snappy_dir + "/include"
- args = ["--global-option", "build_ext", "--global-option", "-L"+ lib_path,
- "--global-option", "-I" + include_dir, "-r", REQS_PATH]
-
LOG.info("Installing packages into the virtualenv")
- exec_pip_install(args)
+ exec_pip_install(["-r", REQS_PATH])
shutil.copyfile(REQS_PATH, INSTALLED_REQS_PATH)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index a7dffd2..b3ebfb1 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -44,7 +44,6 @@ prettytable == 0.7.2
psutil == 0.7.1
pyelftools == 0.23
pyparsing == 2.0.3
-python-snappy == 0.5.0
pytest == 2.7.2
py == 1.4.30
pytest-random == 0.02
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index fde7032..15f799c 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -3,9 +3,6 @@
import os
import pytest
import random
-import snappy
-import string
-import struct
import subprocess
from os.path import join
from subprocess import call
@@ -147,23 +144,19 @@ class TestTableWriters(ImpalaTestSuite):
@pytest.mark.execute_serially
class TestLargeCompressedFile(ImpalaTestSuite):
- """
- Tests that Impala handles compressed files in HDFS larger than 1GB.
- This test creates a 2GB test data file and loads it into a table.
- """
+ """ Tests that we gracefully handle when a compressed file in HDFS is larger
+ than 1GB.
+ This test creates a testing data file that is over 1GB and loads it to a table.
+ Then verifies Impala will gracefully fail the query.
+ TODO: Once IMPALA-1619 is fixed, modify the test to test > 2GB file."""
+
TABLE_NAME = "large_compressed_file"
TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
- """
- Name the file with ".snappy" extension to let scanner treat it as
- a snappy block compressed file.
- """
+ """ Name the file with ".snappy" extension to let scanner treat it
+ as a snappy compressed file."""
FILE_NAME = "largefile.snappy"
- # Maximum uncompressed size of an outer block in a snappy block compressed file.
- CHUNK_SIZE = 1024 * 1024 * 1024
- # Limit the max file size to 2GB or too much memory may be needed when
- # uncompressing the buffer. 2GB is sufficient to show that we support
- # size beyond maximum 32-bit signed value.
- MAX_FILE_SIZE = 2 * CHUNK_SIZE
+ LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+ MAX_FILE_SIZE = 1024 * 1024 * 1024
@classmethod
def get_workload(self):
@@ -177,59 +170,48 @@ class TestLargeCompressedFile(ImpalaTestSuite):
pytest.skip("skipping if it's not exhaustive test.")
cls.TestMatrix.add_constraint(lambda v:
(v.get_value('table_format').file_format =='text' and
- v.get_value('table_format').compression_codec == 'snap'))
+ v.get_value('table_format').compression_codec == 'none'))
def teardown_method(self, method):
self.__drop_test_table()
+ def __gen_char_or_num(self):
+ return random.choice(self.LETTERS)
+
def __generate_file(self, file_name, file_size):
"""Generate file with random data and a specified size."""
- content = "Lots of content here there here there here there"
- # Permutate the input buffer.
- payload = ''
- for i in range(1024):
- payload += ''.join(random.sample(content, len(content))) + ','
-
- compressed_payload = snappy.compress(payload)
- compressed_size = len(compressed_payload)
-
- num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
- num_iterations = self.CHUNK_SIZE / (compressed_size + 4)
- total_size = num_iterations * len(payload)
-
+ s = ''
+ for j in range(1024):
+ s = s + self.__gen_char_or_num()
put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
- stdin=subprocess.PIPE, bufsize=-1)
- """
- The layout of a snappy-block compressed file is one or more blocks
- of the following nested structure:
- - <big endian 32-bit value encoding the uncompresed size>
- - one or more blocks of the following structure:
- - <big endian 32-bit value encoding the compressed size>
- - <raw bits compressed by snappy algorithm>
- """
- for i in range(num_chunks):
- put.stdin.write(struct.pack('>i', total_size))
- for j in range(num_iterations):
- put.stdin.write(struct.pack('>i', compressed_size))
- put.stdin.write(compressed_payload)
+ stdin=subprocess.PIPE, bufsize=-1)
+ remain = file_size % 1024
+ for i in range(int(file_size / 1024)):
+ put.stdin.write(s)
+ put.stdin.write(s[0:remain])
put.stdin.close()
put.wait()
def test_query_large_file(self, vector):
self.__create_test_table();
dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
- file_size = self.MAX_FILE_SIZE
+ file_size = self.MAX_FILE_SIZE + 1
self.__generate_file(dst_path, file_size)
self.client.execute("refresh %s" % self.TABLE_NAME)
- # Query the table
- result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+ # Query the table and check for expected error.
+ expected_error = 'Requested buffer size %dB > 1GB' % file_size
+ try:
+ result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+ assert False, "Query was expected to fail"
+ except Exception as e:
+ error_msg = str(e)
+ assert expected_error in error_msg
def __create_test_table(self):
self.__drop_test_table()
- self.client.execute("CREATE TABLE %s (col string) " \
- "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
- % (self.TABLE_NAME, self.TABLE_LOCATION))
+ self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'"
+ % (self.TABLE_NAME, self.TABLE_LOCATION))
def __drop_test_table(self):
self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)