You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/08 22:42:15 UTC
incubator-impala git commit: IMPALA-1619: Support 64-bit allocations.
Repository: incubator-impala
Updated Branches:
refs/heads/master 667a778af -> ed5ec6772
IMPALA-1619: Support 64-bit allocations.
This change extends MemPool, FreePool and StringBuffer to support
64-bit allocations, fixes a bug in decompressor and extends various
places in the code to support 64-bit allocation sizes. With this
change, the text scanner can now decompress compressed files larger
than 1GB.
Note that the UDF interfaces FunctionContext::Allocate() and
FunctionContext::Reallocate() still use 32-bit for the input
argument to avoid breaking compatibility.
In addition, the byte size of a tuple is still assumed to be
within 32-bit. If it needs to be upgraded to 64-bit, it will be
done in a separate change.
A new test has been added to test the decompression of a 2GB
snappy block compressed text file.
Change-Id: Ic1af1564953ac02aca2728646973199381c86e5f
Reviewed-on: http://gerrit.cloudera.org:8080/3575
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed5ec677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed5ec677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed5ec677
Branch: refs/heads/master
Commit: ed5ec6772fd24ad28901bc68f120c59597439cf2
Parents: 667a778
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jun 30 15:15:27 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Jul 8 15:42:09 2016 -0700
----------------------------------------------------------------------
be/src/exec/delimited-text-parser.cc | 37 +++---
be/src/exec/delimited-text-parser.h | 40 ++++---
be/src/exec/delimited-text-parser.inline.h | 55 +++++----
be/src/exec/hdfs-scanner.cc | 2 +-
be/src/exec/hdfs-scanner.h | 3 +
be/src/exec/hdfs-sequence-scanner.cc | 8 +-
be/src/exec/hdfs-text-scanner.cc | 54 ++++-----
be/src/exec/hdfs-text-scanner.h | 6 +-
be/src/exec/scanner-context.cc | 16 +--
be/src/runtime/buffered-block-mgr.cc | 21 ++--
be/src/runtime/collection-value-builder.h | 2 +-
be/src/runtime/free-pool-test.cc | 27 ++++-
be/src/runtime/free-pool.h | 19 +--
be/src/runtime/mem-pool-test.cc | 49 +++++---
be/src/runtime/mem-pool.h | 8 +-
be/src/runtime/string-buffer-test.cc | 20 ++--
be/src/runtime/string-buffer.h | 116 +++++++++----------
be/src/udf/udf-internal.h | 4 +-
be/src/udf/udf.cc | 4 +-
be/src/udf/udf.h | 2 +
be/src/util/bit-util.h | 22 ++--
be/src/util/codec.cc | 12 +-
be/src/util/codec.h | 5 -
be/src/util/decompress-test.cc | 4 +-
be/src/util/decompress.cc | 67 +++--------
common/thrift/generate_error_codes.py | 3 +
testdata/bin/create-load-data.sh | 5 +
testdata/compressed_formats/README | 4 +
.../compressed_formats/compressed_payload.snap | Bin 0 -> 37270 bytes
tests/query_test/test_compressed_formats.py | 95 +++++++++------
30 files changed, 370 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 950eae4..1a785ac 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -116,11 +116,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
if (process_escapes_) {
- ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
- field_locations, num_tuples, num_fields, next_column_start);
+ RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr,
+ row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
} else {
- ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
- field_locations, num_tuples, num_fields, next_column_start);
+ RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr,
+ row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
}
}
@@ -155,9 +155,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
// If the row ended in \r\n then move to the \n
++*next_column_start;
} else {
- AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations);
- FillColumns<false>(0, NULL, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations));
+ Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+ DCHECK(status.ok());
column_idx_ = num_partition_keys_;
row_end_locations[*num_tuples] = *byte_buffer_ptr;
++(*num_tuples);
@@ -171,8 +172,8 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
return Status::OK();
}
} else if (new_col) {
- AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations));
}
--remaining_len;
@@ -183,9 +184,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
// e.g. Sequence files.
if (tuple_delim_ == '\0') {
DCHECK_EQ(remaining_len, 0);
- AddColumn<true>(*byte_buffer_ptr - *next_column_start,
- next_column_start, num_fields, field_locations);
- FillColumns<false>(0, NULL, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations));
+ Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+ DCHECK(status.ok());
column_idx_ = num_partition_keys_;
++(*num_tuples);
unfinished_tuple_ = false;
@@ -193,11 +195,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
return Status::OK();
}
-// Find the first instance of the tuple delimiter. This will
-// find the start of the first full tuple in buffer by looking for the end of
-// the previous tuple.
-int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) {
- int tuple_start = 0;
+// Find the first instance of the tuple delimiter. This will find the start of the first
+// full tuple in buffer by looking for the end of the previous tuple.
+int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
+ int64_t tuple_start = 0;
const char* buffer_start = buffer;
bool found = false;
@@ -256,7 +257,7 @@ restart:
// tuple break that are all escape characters, but that is
// unlikely.
int num_escape_chars = 0;
- int before_tuple_end = tuple_start - 2;
+ int64_t before_tuple_end = tuple_start - 2;
// TODO: If scan range is split between escape character and tuple delimiter,
// before_tuple_end will be -1. Need to scan previous range for escape characters
// in this case.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index f3e108c..d754c74 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -32,9 +32,9 @@ class DelimitedTextParser {
/// collection_item_delim: delimits collection items
/// escape_char: escape delimiters, make them part of the data.
//
- /// num_cols is the total number of columns including partition keys.
+ /// 'num_cols' is the total number of columns including partition keys.
//
- /// is_materialized_col should be initialized to an array of length 'num_cols', with
+ /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
/// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
/// Owned by caller.
//
@@ -73,6 +73,8 @@ class DelimitedTextParser {
/// num_fields: Number of materialized fields parsed
/// next_column_start: pointer within file_buffer_ where the next field starts
/// after the return from the call to ParseData
+ /// Returns an error status if any column exceeds the size limit.
+ /// See AddColumn() for details.
Status ParseFieldLocations(int max_tuples, int64_t remaining_len,
char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
@@ -84,9 +86,10 @@ class DelimitedTextParser {
/// col.
/// - *num_fields returns the number of fields processed.
/// This function is used to parse sequence file records which do not need to
- /// parse for tuple delimiters.
+ /// parse for tuple delimiters. Returns an error status if any column exceeds the
+ /// size limit. See AddColumn() for details.
template <bool process_escapes>
- void ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
+ Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
int* num_fields);
/// FindFirstInstance returns the position after the first non-escaped tuple
@@ -94,7 +97,7 @@ class DelimitedTextParser {
/// Used to find the start of a tuple if jumping into the middle of a text file.
/// Also used to find the sync marker for Sequenced and RC files.
/// If no tuple delimiter is found within the buffer, return -1;
- int FindFirstInstance(const char* buffer, int len);
+ int64_t FindFirstInstance(const char* buffer, int64_t len);
/// Will we return the current column to the query?
/// Hive allows cols at the end of the table that are not in the schema. We'll
@@ -104,17 +107,18 @@ class DelimitedTextParser {
}
/// Fill in columns missing at the end of the tuple.
- /// len and last_column may contain the length and the pointer to the
+ /// 'len' and 'last_column' may contain the length and the pointer to the
/// last column on which the file ended without a delimiter.
/// Fills in the offsets and lengths in field_locations.
- /// If parsing stopped on a delimiter and there is no last column then len will be 0.
+ /// If parsing stopped on a delimiter and there is no last column then length will be 0.
/// Other columns beyond that are filled with 0 length fields.
- /// num_fields points to an initialized count of fields and will incremented
+ /// 'num_fields' points to an initialized count of fields and will incremented
/// by the number fields added.
- /// field_locations will be updated with the start and length of the fields.
+ /// 'field_locations' will be updated with the start and length of the fields.
+ /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
template <bool process_escapes>
- void FillColumns(int len, char** last_column,
- int* num_fields, impala::FieldLocation* field_locations);
+ Status FillColumns(int64_t len, char** last_column, int* num_fields,
+ impala::FieldLocation* field_locations);
/// Return true if we have not seen a tuple delimiter for the current tuple being
/// parsed (i.e., the last byte read was not a tuple delimiter).
@@ -128,24 +132,28 @@ class DelimitedTextParser {
/// Template parameter:
/// process_escapes -- if true the the column may have escape characters
/// and the negative of the len will be stored.
- /// len: lenght of the current column.
+ /// len: length of the current column. The length of a column must fit in a 32-bit
+ /// signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
+ /// it will be treated as an error.
/// Input/Output:
/// next_column_start: Start of the current column, moved to the start of the next.
/// num_fields: current number of fields processed, updated to next field.
/// Output:
/// field_locations: updated with start and length of current field.
+ /// Return an error status if 'len' exceeds the size limit specified above.
template <bool process_escapes>
- void AddColumn(int len, char** next_column_start, int* num_fields,
- FieldLocation* field_locations);
+ Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
+ FieldLocation* field_locations);
/// Helper routine to parse delimited text using SSE instructions.
/// Identical arguments as ParseFieldLocations.
/// If the template argument, 'process_escapes' is true, this function will handle
/// escapes, otherwise, it will assume the text is unescaped. By using templates,
/// we can special case the un-escaped path for better performance. The unescaped
- /// path is optimized away by the compiler.
+ /// path is optimized away by the compiler. Returns an error status if the length
+ /// of any column exceeds the size limit. See AddColumn() for details.
template <bool process_escapes>
- void ParseSse(int max_tuples, int64_t* remaining_len,
+ Status ParseSse(int max_tuples, int64_t* remaining_len,
char** byte_buffer_ptr, char** row_end_locations_,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index e96a763..28b5b4b 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -26,7 +26,7 @@ namespace impala {
/// If the character at n is an escape character, then delimiters(tuple/field/escape
/// characters) at n+1 don't count.
inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
- uint16_t* delim_mask) {
+ uint16_t* delim_mask) {
// Escape characters can escape escape characters.
bool first_char_is_escape = *last_char_is_escape;
bool escape_next = first_char_is_escape;
@@ -39,7 +39,7 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
// Remember last character for the next iteration
*last_char_is_escape = escape_mask &
- SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
+ SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
// Shift escape mask up one so they match at the same bit index as the tuple and
// field mask (instead of being the character before) and set the correct first bit
@@ -50,35 +50,41 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
}
template <bool process_escapes>
-inline void DelimitedTextParser::AddColumn(int len, char** next_column_start,
+inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
int* num_fields, FieldLocation* field_locations) {
+ if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
+ return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
+ }
if (ReturnCurrentColumn()) {
// Found a column that needs to be parsed, write the start/len to 'field_locations'
field_locations[*num_fields].start = *next_column_start;
+ int64_t field_len = len;
if (process_escapes && current_column_has_escape_) {
- field_locations[*num_fields].len = -len;
- } else {
- field_locations[*num_fields].len = len;
+ field_len = -len;
}
+ field_locations[*num_fields].len = static_cast<int32_t>(field_len);
++(*num_fields);
}
if (process_escapes) current_column_has_escape_ = false;
*next_column_start += len + 1;
++column_idx_;
+ return Status::OK();
}
template <bool process_escapes>
-void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
+inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
int* num_fields, FieldLocation* field_locations) {
// Fill in any columns missing from the end of the tuple.
char* dummy = NULL;
if (last_column == NULL) last_column = &dummy;
while (column_idx_ < num_cols_) {
- AddColumn<process_escapes>(len, last_column, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
+ num_fields, field_locations));
// The rest of the columns will be null.
last_column = &dummy;
len = 0;
}
+ return Status::OK();
}
/// SSE optimized raw text file parsing. SSE4_2 added an instruction (with 3 modes) for
@@ -95,10 +101,9 @@ void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
/// Haystack = 'asdfghjklhjbdwwc' (the raw string)
/// Result = '1010000000011001'
template <bool process_escapes>
-inline void DelimitedTextParser::ParseSse(int max_tuples,
+inline Status DelimitedTextParser::ParseSse(int max_tuples,
int64_t* remaining_len, char** byte_buffer_ptr,
- char** row_end_locations,
- FieldLocation* field_locations,
+ char** row_end_locations, FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start) {
DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
@@ -172,8 +177,8 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
char* delim_ptr = *byte_buffer_ptr + n;
if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
- AddColumn<process_escapes>(delim_ptr - *next_column_start,
- next_column_start, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations));
continue;
}
@@ -185,9 +190,10 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
last_row_delim_offset_ = -1;
continue;
}
- AddColumn<process_escapes>(delim_ptr - *next_column_start,
- next_column_start, num_fields, field_locations);
- FillColumns<false>(0, NULL, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
+ next_column_start, num_fields, field_locations));
+ Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+ DCHECK(status.ok());
column_idx_ = num_partition_keys_;
row_end_locations[*num_tuples] = delim_ptr;
++(*num_tuples);
@@ -200,7 +206,7 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
// If the last character we processed was \r then set the offset to 0
// so that we will use it at the beginning of the next batch.
if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
- return;
+ return Status::OK();
}
}
}
@@ -214,11 +220,12 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
*remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
*byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
}
+ return Status::OK();
}
/// Simplified version of ParseSSE which does not handle tuple delimiters.
template <bool process_escapes>
-inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
FieldLocation* field_locations, int* num_fields) {
char* next_column_start = buffer;
__m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
@@ -263,8 +270,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
// clear current bit
delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
- AddColumn<process_escapes>(buffer + n - next_column_start,
- &next_column_start, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
+ &next_column_start, num_fields, field_locations));
}
if (process_escapes) {
@@ -288,8 +295,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
if (!last_char_is_escape_ &&
(*buffer == field_delim_ || *buffer == collection_item_delim_)) {
- AddColumn<process_escapes>(buffer - next_column_start,
- &next_column_start, num_fields, field_locations);
+ RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
+ &next_column_start, num_fields, field_locations));
}
--remaining_len;
@@ -298,8 +305,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
// Last column does not have a delimiter after it. Add that column and also
// pad with empty cols if the input is ragged.
- FillColumns<process_escapes>(buffer - next_column_start,
- &next_column_start, num_fields, field_locations);
+ return FillColumns<process_escapes>(buffer - next_column_start,
+ &next_column_start, num_fields, field_locations);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 78d4994..275956c 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -186,7 +186,7 @@ Status HdfsScanner::CommitRows(int num_rows) {
DCHECK(batch_ != NULL);
DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
batch_->CommitRows(num_rows);
- tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows;
+ tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
// We need to pass the row batch to the scan node if there is too much memory attached,
// which can happen if the query is very selective. We need to release memory even
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 7f804f1..9069451 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -58,6 +58,9 @@ struct FieldLocation {
char* start;
/// Encodes the length and whether or not this fields needs to be unescaped.
/// If len < 0, then the field needs to be unescaped.
+ ///
+ /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue
+ /// and StringVal. All other types shouldn't be anywhere near this limit.
int len;
static const char* LLVM_CLASS_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 1460b1a..0cd000f 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -229,15 +229,15 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
for (int i = 0; i < num_to_process; ++i) {
int num_fields = 0;
if (delimited_text_parser_->escape_char() == '\0') {
- delimited_text_parser_->ParseSingleTuple<false>(
+ RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<false>(
record_locations_[i].len,
reinterpret_cast<char*>(record_locations_[i].record),
- &field_locations_[field_location_offset], &num_fields);
+ &field_locations_[field_location_offset], &num_fields));
} else {
- delimited_text_parser_->ParseSingleTuple<true>(
+ RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<true>(
record_locations_[i].len,
reinterpret_cast<char*>(record_locations_[i].record),
- &field_locations_[field_location_offset], &num_fields);
+ &field_locations_[field_location_offset], &num_fields));
}
DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
field_location_offset += num_fields;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6e501cc..6cc308d 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -260,8 +260,8 @@ Status HdfsTextScanner::FinishScanRange() {
// tuple.
DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
DCHECK(partial_tuple_empty_);
- DCHECK(boundary_column_.Empty());
- DCHECK(boundary_row_.Empty());
+ DCHECK(boundary_column_.IsEmpty());
+ DCHECK(boundary_row_.IsEmpty());
return Status::OK();
}
@@ -286,24 +286,24 @@ Status HdfsTextScanner::FinishScanRange() {
ss << "Read failed while trying to finish scan range: " << stream_->filename()
<< ":" << stream_->file_offset() << endl << status.GetDetail();
RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
- } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
- !boundary_row_.Empty() ||
+ } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
+ !boundary_row_.IsEmpty() ||
(delimited_text_parser_->HasUnfinishedTuple() &&
(!scan_node_->materialized_slots().empty() ||
scan_node_->num_materialized_partition_keys() > 0))) {
// Missing columns or row delimiter at end of the file is ok, fill the row in.
- char* col = boundary_column_.str().ptr;
+ char* col = boundary_column_.buffer();
int num_fields = 0;
- delimited_text_parser_->FillColumns<true>(boundary_column_.Size(),
- &col, &num_fields, &field_locations_[0]);
+ RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
+ &col, &num_fields, &field_locations_[0]));
MemPool* pool;
TupleRow* tuple_row_mem;
int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
DCHECK_GE(max_tuples, 1);
// Set variables for proper error outputting on boundary tuple
- batch_start_ptr_ = boundary_row_.str().ptr;
- row_end_locations_[0] = batch_start_ptr_ + boundary_row_.str().len;
+ batch_start_ptr_ = boundary_row_.buffer();
+ row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len();
int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
DCHECK_LE(num_tuples, 1);
DCHECK_GE(num_tuples, 0);
@@ -371,7 +371,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
(num_fields > 0 || *num_tuples > 0)) {
// There can be one partial tuple which returned no more fields from this buffer.
DCHECK_LE(*num_tuples, num_fields + 1);
- if (!boundary_column_.Empty()) {
+ if (!boundary_column_.IsEmpty()) {
RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
boundary_column_.Clear();
}
@@ -423,11 +423,11 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
Status status;
if (num_bytes > 0) {
stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
- &byte_buffer_read_size_, &status);
+ &byte_buffer_read_size_, &status);
} else {
DCHECK_EQ(num_bytes, 0);
status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
- &byte_buffer_read_size_);
+ &byte_buffer_read_size_);
}
RETURN_IF_ERROR(status);
*eosr = stream_->eosr();
@@ -555,7 +555,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
}
// Need to read the entire file.
- if (file_size < byte_buffer_read_size_) {
+ if (file_size > byte_buffer_read_size_) {
stringstream ss;
ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
<< "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
@@ -600,8 +600,8 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
delimited_text_parser_->ParserReset();
SCOPED_TIMER(parse_delimiter_timer_);
- int next_tuple_offset = 0;
- int bytes_left = byte_buffer_read_size_;
+ int64_t next_tuple_offset = 0;
+ int64_t bytes_left = byte_buffer_read_size_;
while (num_skipped_rows < num_rows_to_skip) {
next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
bytes_left);
@@ -610,7 +610,6 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
bytes_left -= next_tuple_offset;
++num_skipped_rows;
}
-
if (next_tuple_offset != -1) *tuple_found = true;
} while (!*tuple_found && !eosr);
@@ -681,7 +680,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
// codegen'd using the IRBuilder for the specific tuple description. This function
// is then injected into the cross-compiled driving function, WriteAlignedTuples().
Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
- const vector<ExprContext*>& conjunct_ctxs) {
+ const vector<ExprContext*>& conjunct_ctxs) {
if (!node->runtime_state()->codegen_enabled()) return NULL;
LlvmCodeGen* codegen;
if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
@@ -717,9 +716,9 @@ void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) {
row_start = row_end_locations_[row_idx - 1] + 1;
}
- if (!boundary_row_.Empty()) {
- // Log the beginning of the line from the previous file buffer(s)
- *ss << boundary_row_.str();
+ if (!boundary_row_.IsEmpty()) {
+ // Log the beginning of the line from the previous file buffer(s).
+ *ss << string(boundary_row_.buffer(), boundary_row_.len());
}
// Log the erroneous line (or the suffix of a line if !boundary_line.empty()).
*ss << string(row_start, row_end - row_start);
@@ -792,7 +791,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
// Write complete tuples. The current field, if any, is at the start of a tuple.
if (num_tuples > 0) {
int max_added_tuples = (scan_node_->limit() == -1) ?
- num_tuples : scan_node_->limit() - scan_node_->rows_returned();
+ num_tuples : scan_node_->limit() - scan_node_->rows_returned();
int tuples_returned = 0;
// Call jitted function if possible
if (write_tuples_fn_ != NULL) {
@@ -836,31 +835,29 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
bool needs_escape = data->len < 0;
int copy_len = needs_escape ? -data->len : data->len;
- int total_len = copy_len + boundary_column_.Size();
+ int64_t total_len = copy_len + boundary_column_.len();
char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
if (UNLIKELY(str_data == NULL)) {
string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
"$0 bytes.", total_len);
return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
}
- memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
- memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
+ memcpy(str_data, boundary_column_.buffer(), boundary_column_.len());
+ memcpy(str_data + boundary_column_.len(), data->start, copy_len);
data->start = str_data;
data->len = needs_escape ? -total_len : total_len;
return Status::OK();
}
-int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
+void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
int num_fields, bool copy_strings) {
- int next_line_offset = 0;
for (int i = 0; i < num_fields; ++i) {
- int need_escape = false;
+ bool need_escape = false;
int len = fields[i].len;
if (len < 0) {
len = -len;
need_escape = true;
}
- next_line_offset += (len + 1);
const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
if (!text_converter_->WriteSlot(desc, partial_tuple_,
@@ -870,5 +867,4 @@ int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
}
++slot_idx_;
}
- return next_line_offset;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 997637d..dae104d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -156,9 +156,9 @@ class HdfsTextScanner : public HdfsScanner {
int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples);
/// Utility function to write out 'num_fields' to 'tuple_'. This is used to parse
- /// partial tuples. Returns bytes processed. If copy_strings is true, strings
- /// from fields will be copied into the boundary pool.
- int WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
+ /// partial tuples. If copy_strings is true, strings from fields will be copied into
+ /// the boundary pool.
+ void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
/// Appends the current file and line to the RuntimeState's error log.
/// row_idx is 0-based (in current batch) where the parse error occured.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 3c78d88..6a5081d 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -234,15 +234,9 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
boundary_buffer_->Clear();
}
}
- // Workaround IMPALA-1619. Fail the request if requested_len is more than 1GB.
- // StringBuffer can only handle 32-bit allocations and StringBuffer::Append()
- // will allocate twice the current buffer size, cause int overflow.
- // TODO: Revert once IMPALA-1619 is fixed.
- if (UNLIKELY(requested_len > StringValue::MAX_LENGTH)) {
- LOG(WARNING) << "Requested buffer size " << requested_len << "B > 1GB."
- << GetStackTrace();
- return Status(Substitute("Requested buffer size $0B > 1GB", requested_len));
- }
+
+ // Resize the buffer to the right size.
+ RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
// We need to fetch more bytes. Copy the end of the current buffer and fetch the next
@@ -273,8 +267,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
} else {
RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
boundary_buffer_bytes_left_ += num_bytes;
- boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
- boundary_buffer_->Size() - boundary_buffer_bytes_left_;
+ boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
+ boundary_buffer_->len() - boundary_buffer_bytes_left_;
io_buffer_bytes_left_ -= num_bytes;
io_buffer_pos_ += num_bytes;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 8265ea2..a9e95cd 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -303,22 +303,14 @@ bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers)
}
bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
- // Workaround IMPALA-1619. Return immediately if the allocation size will cause
- // an arithmetic overflow.
- if (UNLIKELY(size >= (1LL << 31))) {
- // IMPALA-3238: don't repeatedly log warning when bumping up against this limit for
- // large hash tables.
- if (!client->logged_large_allocation_warning_) {
- LOG(WARNING) << "Trying to allocate memory >=2GB (" << size << ")B."
- << GetStackTrace();
- client->logged_large_allocation_warning_ = true;
- }
+ int64_t buffers_needed = BitUtil::Ceil(size, max_block_size());
+ if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) {
+ VLOG_QUERY << "Trying to consume " << size << " which is out of range.";
return false;
}
- int buffers_needed = BitUtil::Ceil(size, max_block_size());
DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
- unique_lock<mutex> lock(lock_);
+ unique_lock<mutex> lock(lock_);
if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
// For small allocations (less than a block size), just let the allocation through.
client->tracker_->ConsumeLocal(size, client->query_tracker_);
@@ -593,7 +585,7 @@ int BufferedBlockMgr::num_pinned_buffers(Client* client) const {
}
int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const {
- return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
+ return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
}
MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
@@ -1017,7 +1009,8 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
// 1. In the unpinned list. The buffer will not be in the free list.
// 2. in_write_ == true. The buffer will not be in the free list.
// 3. The buffer is free, but hasn't yet been reassigned to a different block.
- DCHECK_EQ(block->buffer_desc_->len, max_block_size()) << "Non-I/O blocks are always pinned";
+ DCHECK_EQ(block->buffer_desc_->len, max_block_size())
+ << "Non-I/O blocks are always pinned";
DCHECK(unpinned_blocks_.Contains(block) ||
block->in_write_ ||
free_io_buffers_.Contains(block->buffer_desc_));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index c57b546..4065b80 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -32,7 +32,7 @@ class CollectionValueBuilder {
CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc,
MemPool* pool, RuntimeState* state,
- int initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
+ int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
: coll_value_(coll_value),
tuple_desc_(tuple_desc),
pool_(pool),
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool-test.cc b/be/src/runtime/free-pool-test.cc
index 02d245a..c8ff93d 100644
--- a/be/src/runtime/free-pool-test.cc
+++ b/be/src/runtime/free-pool-test.cc
@@ -85,6 +85,22 @@ TEST(FreePoolTest, Basic) {
EXPECT_EQ(mem_pool.total_allocated_bytes(), 64);
mem_pool.FreeAll();
+
+ // Try making allocations larger than 1GB.
+ uint8_t* p5 = pool.Allocate(1LL << 32);
+ EXPECT_TRUE(p5 != NULL);
+ for (int64_t i = 0; i < (1LL << 32); i += (1 << 29)) {
+ *(p5 + i) = i;
+ }
+ EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
+
+ // Test zero-byte allocation.
+ p5 = pool.Allocate(0);
+ EXPECT_TRUE(p5 != NULL);
+ EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
+ pool.Free(p5);
+
+ mem_pool.FreeAll();
}
// In this test we make two allocations at increasing sizes and then we
@@ -96,13 +112,13 @@ TEST(FreePoolTest, Loop) {
MemPool mem_pool(&tracker);
FreePool pool(&mem_pool);
- map<int, pair<uint8_t*, uint8_t*> > primed_allocations;
- vector<int> allocation_sizes;
+ map<int64_t, pair<uint8_t*, uint8_t*> > primed_allocations;
+ vector<int64_t> allocation_sizes;
int64_t expected_pool_size = 0;
// Pick a non-power of 2 to exercise more code.
- for (int size = 3; size < 1024 * 1024 * 1024; size *= 3) {
+ for (int64_t size = 5; size < 6LL * 1024 * 1024 * 1024; size *= 5) {
uint8_t* p1 = pool.Allocate(size);
uint8_t* p2 = pool.Allocate(size);
EXPECT_TRUE(p1 != NULL);
@@ -163,6 +179,11 @@ TEST(FreePoolTest, ReAlloc) {
ptr = pool.Allocate(600);
EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8);
+ // Try allocation larger than 1GB.
+ uint8_t* ptr4 = pool.Reallocate(ptr3, 1LL << 32);
+ EXPECT_TRUE(ptr3 != ptr4);
+ EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8 + (1LL << 32) + 8);
+
mem_pool.FreeAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h
index dfabaf0..90df749 100644
--- a/be/src/runtime/free-pool.h
+++ b/be/src/runtime/free-pool.h
@@ -52,8 +52,9 @@ class FreePool {
memset(&lists_, 0, sizeof(lists_));
}
- /// Allocates a buffer of size.
- uint8_t* Allocate(int size) {
+ /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes.
+ uint8_t* Allocate(int64_t size) {
+ DCHECK_GE(size, 0);
#ifndef NDEBUG
static int32_t alloc_counts = 0;
if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -64,16 +65,15 @@ class FreePool {
++net_allocations_;
if (FLAGS_disable_mem_pools) return reinterpret_cast<uint8_t*>(malloc(size));
- /// This is the typical malloc behavior. NULL is reserved for failures.
- if (size == 0) return reinterpret_cast<uint8_t*>(0x1);
+ /// Return a non-NULL dummy pointer. NULL is reserved for failures.
+ if (UNLIKELY(size == 0)) return mem_pool_->EmptyAllocPtr();
int free_list_idx = Bits::Log2Ceiling64(size);
DCHECK_LT(free_list_idx, NUM_LISTS);
-
FreeListNode* allocation = lists_[free_list_idx].next;
if (allocation == NULL) {
// There wasn't an existing allocation of the right size, allocate a new one.
- size = 1 << free_list_idx;
+ size = 1LL << free_list_idx;
allocation = reinterpret_cast<FreeListNode*>(
mem_pool_->Allocate(size + sizeof(FreeListNode)));
if (UNLIKELY(allocation == NULL)) {
@@ -97,7 +97,7 @@ class FreePool {
free(ptr);
return;
}
- if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return;
+ if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return;
FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
FreeListNode* list = node->list;
#ifndef NDEBUG
@@ -114,7 +114,7 @@ class FreePool {
///
/// NULL will be returned on allocation failure. It's the caller's responsibility to
/// free the memory buffer pointed to by "ptr" in this case.
- uint8_t* Reallocate(uint8_t* ptr, int size) {
+ uint8_t* Reallocate(uint8_t* ptr, int64_t size) {
#ifndef NDEBUG
static int32_t alloc_counts = 0;
if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -125,13 +125,14 @@ class FreePool {
if (FLAGS_disable_mem_pools) {
return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size));
}
- if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return Allocate(size);
+ if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size);
FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
FreeListNode* list = node->list;
#ifndef NDEBUG
CheckValidAllocation(list, ptr);
#endif
int bucket_idx = (list - &lists_[0]);
+ DCHECK_LT(bucket_idx, NUM_LISTS);
// This is the actual size of ptr.
int allocation_size = 1 << bucket_idx;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index ab6bd5a..121098c 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -21,6 +21,9 @@
#include "common/names.h"
+// Maximum allocation size which exceeds 32-bit.
+#define LARGE_ALLOC_SIZE (1LL << 32)
+
namespace impala {
// Utility class to call private functions on MemPool.
@@ -125,6 +128,11 @@ TEST(MemPoolTest, Basic) {
p2.FreeAll();
p3.FreeAll();
}
+
+ // Test zero byte allocation.
+ uint8_t* ptr = p.Allocate(0);
+ EXPECT_TRUE(ptr != NULL);
+ EXPECT_EQ(0, p.GetTotalChunkSizes());
}
// Test that we can keep an allocated chunk and a free chunk.
@@ -192,6 +200,22 @@ TEST(MemPoolTest, ReturnPartial) {
EXPECT_EQ(2, ptr[i]);
}
+ // Try ReturnPartialAllocations() with 64-bit values.
+ uint8_t* ptr4 = p.Allocate(LARGE_ALLOC_SIZE + 512);
+ EXPECT_EQ(1024 + LARGE_ALLOC_SIZE + 512, p.total_allocated_bytes());
+ memset(ptr4, 3, 512 * 2);
+ p.ReturnPartialAllocation(LARGE_ALLOC_SIZE);
+ uint8_t* ptr5 = p.Allocate(512);
+ EXPECT_TRUE(ptr5 == ptr4 + 512);
+ memset(ptr5, 4, 512);
+
+ for (int i = 0; i < 512; ++i) {
+ EXPECT_EQ(3, ptr4[i]);
+ }
+ for (int i = 512; i < 512 * 2; ++i) {
+ EXPECT_EQ(4, ptr4[i]);
+ }
+
p.FreeAll();
}
@@ -252,51 +276,50 @@ TEST(MemPoolTest, Limits) {
ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
// Try To allocate 20 bytes, this should succeed. TryAllocate() should leave the
- // pool in a functional state..
+ // pool in a functional state.
result = p2->TryAllocate(20);
ASSERT_TRUE(result != NULL);
ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
-
p2->FreeAll();
delete p2;
}
TEST(MemPoolTest, MaxAllocation) {
- int64_t int_max_rounded = BitUtil::RoundUp(INT_MAX, 8);
+ int64_t int_max_rounded = BitUtil::RoundUp(LARGE_ALLOC_SIZE, 8);
- // Allocate a single INT_MAX chunk
+ // Allocate a single LARGE_ALLOC_SIZE chunk
MemTracker tracker;
MemPool p1(&tracker);
- uint8_t* ptr = p1.Allocate(INT_MAX);
+ uint8_t* ptr = p1.Allocate(LARGE_ALLOC_SIZE);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded, p1.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded, p1.total_allocated_bytes());
p1.FreeAll();
- // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an INT_MAX chunk
+ // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an LARGE_ALLOC_SIZE chunk
MemPool p2(&tracker);
p2.Allocate(8);
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, p2.GetTotalChunkSizes());
EXPECT_EQ(8, p2.total_allocated_bytes());
- ptr = p2.Allocate(INT_MAX);
+ ptr = p2.Allocate(LARGE_ALLOC_SIZE);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE + int_max_rounded,
p2.GetTotalChunkSizes());
EXPECT_EQ(8LL + int_max_rounded, p2.total_allocated_bytes());
p2.FreeAll();
- // Allocate three INT_MAX chunks followed by a small chunk followed by another INT_MAX
- // chunk
+ // Allocate three LARGE_ALLOC_SIZE chunks followed by a small chunk
+ // followed by another LARGE_ALLOC_SIZE chunk.
MemPool p3(&tracker);
- p3.Allocate(INT_MAX);
+ p3.Allocate(LARGE_ALLOC_SIZE);
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(INT_MAX);
+ ptr = p3.Allocate(LARGE_ALLOC_SIZE);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 2, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 2, p3.total_allocated_bytes());
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(INT_MAX);
+ ptr = p3.Allocate(LARGE_ALLOC_SIZE);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 3, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 3, p3.total_allocated_bytes());
@@ -308,7 +331,7 @@ TEST(MemPoolTest, MaxAllocation) {
EXPECT_EQ(int_max_rounded * 3 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 3 + 8, p3.total_allocated_bytes());
// Allocates new int_max_rounded chunk
- ptr = p3.Allocate(INT_MAX);
+ ptr = p3.Allocate(LARGE_ALLOC_SIZE);
EXPECT_TRUE(ptr != NULL);
EXPECT_EQ(int_max_rounded * 4 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
EXPECT_EQ(int_max_rounded * 4 + 8, p3.total_allocated_bytes());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 15325c0..e1c4d2b 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -101,7 +101,7 @@ class MemPool {
/// Returns 'byte_size' to the current chunk back to the mem pool. This can
/// only be used to return either all or part of the previous allocation returned
/// by Allocate().
- void ReturnPartialAllocation(int byte_size) {
+ void ReturnPartialAllocation(int64_t byte_size) {
DCHECK_GE(byte_size, 0);
DCHECK(current_chunk_idx_ != -1);
ChunkInfo& info = chunks_[current_chunk_idx_];
@@ -110,6 +110,11 @@ class MemPool {
total_allocated_bytes_ -= byte_size;
}
+ /// Return a dummy pointer for zero-length allocations.
+ static uint8_t* EmptyAllocPtr() {
+ return reinterpret_cast<uint8_t*>(&zero_length_region_);
+ }
+
/// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
void Clear();
@@ -208,6 +213,7 @@ class MemPool {
template <bool CHECK_LIMIT_FIRST>
uint8_t* Allocate(int64_t size) noexcept {
+ DCHECK_GE(size, 0);
if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t *>(&zero_length_region_);
int64_t num_bytes = BitUtil::RoundUp(size, 8);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index 95d2e41..2387d60 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -24,10 +24,10 @@
namespace impala {
void ValidateString(const string& std_str, const StringBuffer& str) {
- EXPECT_EQ(std_str.empty(), str.Empty());
- EXPECT_EQ((int)std_str.size(), str.Size());
+ EXPECT_EQ(std_str.empty(), str.IsEmpty());
+ EXPECT_EQ(static_cast<int64_t>(std_str.size()), str.len());
if (std_str.size() > 0) {
- EXPECT_EQ(strncmp(std_str.c_str(), str.str().ptr, std_str.size()), 0);
+ EXPECT_EQ(strncmp(std_str.c_str(), str.buffer(), std_str.size()), 0);
}
}
@@ -55,11 +55,6 @@ TEST(StringBufferTest, Basic) {
str.Append("World", strlen("World"));
ValidateString(std_str, str);
- // Assign
- std_str.assign("foo");
- str.Assign("foo", strlen("foo"));
- ValidateString(std_str, str);
-
// Clear
std_str.clear();
str.Clear();
@@ -72,23 +67,22 @@ TEST(StringBufferTest, Basic) {
}
TEST(StringBufferTest, AppendBoundary) {
- // Test StringBuffer::Append() up to 1GB is ok
- // TODO: Once IMPALA-1619 is fixed, we should change the test to verify
- // append over 2GB string is supported.
+ // Test StringBuffer::Append() works beyond 1GB.
MemTracker tracker;
MemPool pool(&tracker);
StringBuffer str(&pool);
string std_str;
const int64_t chunk_size = 8 * 1024 * 1024;
+ const int64_t max_data_size = 1LL << 32;
std_str.resize(chunk_size, 'a');
int64_t data_size = 0;
- while (data_size + chunk_size <= StringValue::MAX_LENGTH) {
+ while (data_size + chunk_size <= max_data_size) {
str.Append(std_str.c_str(), chunk_size);
data_size += chunk_size;
}
EXPECT_EQ(str.buffer_size(), data_size);
- std_str.resize(StringValue::MAX_LENGTH, 'a');
+ std_str.resize(max_data_size, 'a');
ValidateString(std_str, str);
pool.FreeAll();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index ec59806..3725181 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -27,99 +27,87 @@ namespace impala {
/// Dynamic-sizable string (similar to std::string) but without as many
/// copies and allocations.
-/// StringBuffer wraps a StringValue object with a pool and memory buffer length.
-/// It supports a subset of the std::string functionality but will only allocate
-/// bigger string buffers as necessary. std::string tries to be immutable and will
-/// reallocate very often. std::string should be avoided in all hot paths.
+/// StringBuffer is a buffer of char allocated from 'pool'. Current usage and size of the
+/// buffer are tracked in 'len_' and 'buffer_size_' respectively. It supports a subset of
+/// the std::string functionality but will only allocate bigger string buffers as
+/// necessary. std::string tries to be immutable and will reallocate very often.
+/// std::string should be avoided in all hot paths.
class StringBuffer {
public:
/// C'tor for StringBuffer. Memory backing the string will be allocated from
/// the pool as necessary. Can optionally be initialized from a StringValue.
StringBuffer(MemPool* pool, StringValue* str = NULL)
- : pool_(pool), buffer_size_(0) {
+ : pool_(pool), buffer_(NULL), len_(0), buffer_size_(0) {
DCHECK(pool_ != NULL);
if (str != NULL) {
- string_value_ = *str;
- buffer_size_ = str->len;
+ buffer_ = str->ptr;
+ len_ = buffer_size_ = str->len;
}
}
/// Append 'str' to the current string, allocating a new buffer as necessary.
- /// Return error status if memory limit is exceeded.
- Status Append(const char* str, int len) {
- int new_len = len + string_value_.len;
+ Status Append(const char* str, int64_t str_len) {
+ int64_t new_len = len_ + str_len;
if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
- memcpy(string_value_.ptr + string_value_.len, str, len);
- string_value_.len = new_len;
+ memcpy(buffer_ + len_, str, str_len);
+ len_ += str_len;
return Status::OK();
}
- /// TODO: switch everything to uint8_t?
- Status Append(const uint8_t* str, int len) {
- return Append(reinterpret_cast<const char*>(str), len);
+ /// Wrapper around append() for input type 'uint8_t'.
+ Status Append(const uint8_t* str, int64_t str_len) {
+ return Append(reinterpret_cast<const char*>(str), str_len);
}
- /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
- Status Assign(const char* str, int len) {
- Clear();
- return Append(str, len);
- }
-
- /// Clear the underlying StringValue. The allocated buffer can be reused.
- void Clear() {
- string_value_.len = 0;
- }
+ /// Clear the underlying StringValue. The allocated buffer can be reused.
+ void Clear() { len_ = 0; }
- /// Clears the underlying buffer and StringValue
+ /// Reset the usage and size of the buffer. Note that the allocated buffer is
+ /// retained but cannot be reused.
void Reset() {
- string_value_.len = 0;
+ len_ = 0;
buffer_size_ = 0;
+ buffer_ = NULL;
}
- /// Returns whether the current string is empty
- bool Empty() const {
- return string_value_.len == 0;
+ /// Returns true if no byte is consumed in the buffer.
+ bool IsEmpty() const { return len_ == 0; }
+
+ /// Grows the buffer to be at least 'new_size', copying over the previous data
+ /// into the new buffer. The old buffer is not freed. Return an error status if
+ /// growing the buffer will exceed memory limit.
+ Status GrowBuffer(int64_t new_size) {
+ if (LIKELY(new_size > buffer_size_)) {
+ int64_t old_size = buffer_size_;
+ buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);
+ char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+ if (UNLIKELY(new_buffer == NULL)) {
+ string details = Substitute("StringBuffer failed to grow buffer from $0 "
+ "to $1 bytes.", old_size, buffer_size_);
+ return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+ }
+ if (LIKELY(len_ > 0)) memcpy(new_buffer, buffer_, len_);
+ buffer_ = new_buffer;
+ }
+ return Status::OK();
}
- /// Returns the length of the current string
- int Size() const {
- return string_value_.len;
- }
+ /// Returns the number of bytes consumed in the buffer.
+ int64_t len() const { return len_; }
- /// Returns the underlying StringValue
- const StringValue& str() const {
- return string_value_;
- }
+ /// Returns the pointer to the buffer. Note that it's the caller's responsibility
+ /// to not retain the pointer to 'buffer_' across call to Append() as the buffer_
+ /// may be relocated in Append().
+ char* buffer() const { return buffer_; }
- /// Returns the buffer size
- int buffer_size() const {
- return buffer_size_;
- }
+ /// Returns the size of the buffer.
+ int64_t buffer_size() const { return buffer_size_; }
private:
- /// Grows the buffer backing the string to be at least new_size, copying over the
- /// previous string data into the new buffer. Return error status if memory limit
- /// is exceeded.
- Status GrowBuffer(int new_len) {
- // TODO: Release/reuse old buffers somehow
- buffer_size_ = std::max(buffer_size_ * 2, new_len);
- DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
- char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
- if (UNLIKELY(new_buffer == NULL)) {
- string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
- buffer_size_);
- return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
- }
- if (LIKELY(string_value_.len > 0)) {
- memcpy(new_buffer, string_value_.ptr, string_value_.len);
- }
- string_value_.ptr = new_buffer;
- return Status::OK();
- }
-
MemPool* pool_;
- StringValue string_value_;
- int buffer_size_;
+ char* buffer_;
+ int64_t len_; // number of bytes consumed in the buffer.
+ int64_t buffer_size_; // size of the buffer.
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 599e42f..1996838 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -79,7 +79,7 @@ class FunctionContextImpl {
/// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear.
/// For UDFs, the allocations can be freed at the row level.
/// TODO: free them at the batch level and save some copies?
- uint8_t* AllocateLocal(int byte_size) noexcept;
+ uint8_t* AllocateLocal(int64_t byte_size) noexcept;
/// Frees all allocations returned by AllocateLocal().
void FreeLocalAllocations() noexcept;
@@ -121,7 +121,7 @@ class FunctionContextImpl {
/// if necessary.
///
/// Return false if 'buf' is null; returns true otherwise.
- bool CheckAllocResult(const char* fn_name, uint8_t* buf, int byte_size);
+ bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size);
/// Preallocated buffer for storing varargs (if the function has any). Allocated and
/// owned by this object, but populated by an Expr function.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 4cadb63..ff81c9c 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -267,7 +267,7 @@ const char* FunctionContext::error_msg() const {
}
inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
- uint8_t* buf, int byte_size) {
+ uint8_t* buf, int64_t byte_size) {
if (UNLIKELY(buf == NULL)) {
stringstream ss;
ss << string(fn_name) << "() failed to allocate " << byte_size << " bytes.";
@@ -416,7 +416,7 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) {
}
}
-uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) noexcept {
+uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept {
assert(!closed_);
if (byte_size == 0) return NULL;
uint8_t* buffer = pool_->Allocate(byte_size);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 1b1cdab..210c855 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -140,6 +140,7 @@ class FunctionContext {
/// The UDF/UDA is responsible for calling Free() on all buffers returned by Allocate().
/// If Allocate() fails or causes the memory limit to be exceeded, the error will be
/// set in this object causing the query to fail.
+ /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
uint8_t* Allocate(int byte_size) noexcept;
/// Wrapper around Allocate() to allocate a buffer of the given type "T".
@@ -155,6 +156,7 @@ class FunctionContext {
/// memory limit to be exceeded, the error will be set in this object.
///
/// This should be used for buffers that constantly get appended to.
+ /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
uint8_t* Reallocate(uint8_t* ptr, int byte_size) noexcept;
/// Frees a buffer returned from Allocate() or Reallocate()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index cae8212..14553ae 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -23,6 +23,7 @@
#endif
#include <boost/type_traits/make_unsigned.hpp>
+#include <limits>
#include "common/compiler-util.h"
#include "util/cpu-info.h"
@@ -203,31 +204,36 @@ class BitUtil {
static inline uint16_t FromBigEndian(uint16_t val) { return val; }
#endif
- // Logical right shift for signed integer types
- // This is needed because the C >> operator does arithmetic right shift
- // Negative shift amounts lead to undefined behavior
+ /// Returns true if 'value' is a non-negative 32-bit integer.
+ static inline bool IsNonNegative32Bit(int64_t value) {
+ return static_cast<uint64_t>(value) <= std::numeric_limits<int32_t>::max();
+ }
+
+ /// Logical right shift for signed integer types
+ /// This is needed because the C >> operator does arithmetic right shift
+ /// Negative shift amounts lead to undefined behavior
template<typename T>
static T ShiftRightLogical(T v, int shift) {
// Conversion to unsigned ensures most significant bits always filled with 0's
return static_cast<typename make_unsigned<T>::type>(v) >> shift;
}
- // Get an specific bit of a numeric type
+ /// Get an specific bit of a numeric type
template<typename T>
static inline int8_t GetBit(T v, int bitpos) {
T masked = v & (static_cast<T>(0x1) << bitpos);
return static_cast<int8_t>(ShiftRightLogical(masked, bitpos));
}
- // Set a specific bit to 1
- // Behavior when bitpos is negative is undefined
+ /// Set a specific bit to 1
+ /// Behavior when bitpos is negative is undefined
template<typename T>
static T SetBit(T v, int bitpos) {
return v | (static_cast<T>(0x1) << bitpos);
}
- // Set a specific bit to 0
- // Behavior when bitpos is negative is undefined
+ /// Set a specific bit to 0
+ /// Behavior when bitpos is negative is undefined
template<typename T>
static T UnsetBit(T v, int bitpos) {
return v & ~(static_cast<T>(0x1) << bitpos);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 6863d1f..2ee7415 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -182,15 +182,13 @@ Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
const uint8_t* input, int* output_length, uint8_t** output) {
int64_t input_len64 = input_length;
int64_t output_len64 = *output_length;
- RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64,
- output));
- // Check whether we are going to have an overflow if we are going to cast from int64_t
- // to int.
- // TODO: Is there a faster way to do this check?
- if (UNLIKELY(output_len64 > numeric_limits<int>::max())) {
+ RETURN_IF_ERROR(
+ ProcessBlock(output_preallocated, input_len64, input, &output_len64, output));
+ // Buffer size should be between [0, (2^31 - 1)] bytes.
+ if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) {
return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
output_len64));;
}
- *output_length = static_cast<int32_t>(output_len64);
+ *output_length = static_cast<int>(output_len64);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index a337983..563a3b9 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -150,11 +150,6 @@ class Codec {
bool supports_streaming() const { return supports_streaming_; }
- /// Largest block we will compress/decompress: 2GB.
- /// We are dealing with compressed blocks that are never this big but we want to guard
- /// against a corrupt file that has the block length as some large number.
- static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1;
-
protected:
/// Create a compression operator
/// Inputs:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 3a32756..7a94db0 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -121,10 +121,8 @@ class DecompressorTest : public ::testing::Test {
EXPECT_GE(max_compressed_length, 0);
uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
compressed_length = max_compressed_length;
-
-
EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
- &compressed));
+ &compressed));
}
output_len = decompressor->MaxOutputLen(compressed_length, compressed);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 8a2ea74..900f891 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -148,9 +148,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
if (!reuse_buffer_ || out_buffer_ == NULL) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
- if (buffer_length_ > MAX_BLOCK_SIZE) {
- return Status("Decompressor: block size is too big");
- }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -204,11 +201,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
// User didn't supply the buffer, double the buffer and try again.
temp_memory_pool_->Clear();
buffer_length_ *= 2;
- if (buffer_length_ > MAX_BLOCK_SIZE) {
- stringstream ss;
- ss << "GzipDecompressor: block size is too big: " << buffer_length_;
- return Status(ss.str());
- }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -273,9 +265,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
} else if (!reuse_buffer_ || out_buffer_ == NULL) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
- if (buffer_length_ > MAX_BLOCK_SIZE) {
- return Status("Decompressor: block size is too big");
- }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -295,9 +284,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
DCHECK(!output_preallocated);
temp_memory_pool_->Clear();
buffer_length_ = buffer_length_ * 2;
- if (buffer_length_ > MAX_BLOCK_SIZE) {
- return Status("Decompressor: block size is too big");
- }
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -415,17 +401,18 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
}
// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs
-// the input is split into blocks. Each block "contains the uncompressed length for
-// the block followed by one of more length-prefixed blocks of compressed data."
+// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split
+// into blocks. Each block "contains the uncompressed length for the block followed
+// by one of more length-prefixed blocks of compressed data."
// This is essentially blocks of blocks.
// The outer block consists of:
-// - 4 byte little endian uncompressed_size
+// - 4 byte big endian uncompressed_size
// < inner blocks >
// ... repeated until input_len is consumed ..
// The inner blocks have:
-// - 4-byte little endian compressed_size
+// - 4-byte big endian compressed_size
// < snappy compressed block >
-// - 4-byte little endian compressed_size
+// - 4-byte big endian compressed_size
// < snappy compressed block >
// ... repeated until uncompressed_size from outer block is consumed ...
@@ -443,15 +430,6 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
input += sizeof(uint32_t);
input_len -= sizeof(uint32_t);
- if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) {
- if (uncompressed_total_len == 0) {
- // TODO: is this check really robust?
- return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
- uncompressed_block_len);
- }
- break;
- }
-
if (!size_only) {
int64_t remaining_output_size = *output_len - uncompressed_total_len;
DCHECK_GE(remaining_output_size, uncompressed_block_len);
@@ -464,29 +442,23 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
input_len -= sizeof(uint32_t);
if (compressed_len == 0 || compressed_len > input_len) {
- if (uncompressed_total_len == 0) {
- return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
- }
- input_len = 0;
- break;
+ *output_len = 0;
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
}
// Read how big the output will be.
size_t uncompressed_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
- input_len, &uncompressed_len)) {
- if (uncompressed_total_len == 0) {
- return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
- }
- input_len = 0;
- break;
+ compressed_len, &uncompressed_len)) {
+ *output_len = 0;
+ return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
}
DCHECK_GT(uncompressed_len, 0);
if (!size_only) {
// Decompress this snappy block
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
- compressed_len, output)) {
+ compressed_len, output)) {
return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
}
output += uncompressed_len;
@@ -526,14 +498,6 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
*output = out_buffer_;
}
- if (*output_len > MAX_BLOCK_SIZE) {
- // TODO: is this check really robust?
- stringstream ss;
- ss << "Decompressor: block size is too big. Data is likely corrupt. "
- << "Size: " << *output_len;
- return Status(ss.str());
- }
-
char* out_ptr = reinterpret_cast<char*>(*output);
RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
return Status::OK();
@@ -547,7 +511,7 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
DCHECK(input != NULL);
size_t result;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
- input_len, &result)) {
+ input_len, &result)) {
return -1;
}
return result;
@@ -560,9 +524,6 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed");
if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
buffer_length_ = uncompressed_length;
- if (buffer_length_ > MAX_BLOCK_SIZE) {
- return Status("Decompressor: block size is too big");
- }
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == NULL)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
@@ -576,7 +537,7 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
- static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
+ static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
return Status("Snappy: RawUncompress failed");
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 7815233..26ea78d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -274,6 +274,9 @@ error_codes = (
("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
"data of type $1: $2"),
+
+ ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
+ "supported length of 2147483647 bytes.")
)
import sys
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 53cdefa..dd6f8ad 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -294,6 +294,11 @@ function load-custom-data {
hadoop fs -put -f ${IMPALA_HOME}/testdata/tinytable_seq_snap/tinytable_seq_snap_header_only \
/test-warehouse/tinytable_seq_snap
+ # IMPALA-1619: payload compressed with snappy used for constructing large snappy block
+ # compressed file
+ hadoop fs -put -f ${IMPALA_HOME}/testdata/compressed_formats/compressed_payload.snap \
+ /test-warehouse/compressed_payload.snap
+
beeline -n $USER -u "${JDBC_URL}" -f\
${IMPALA_HOME}/testdata/avro_schema_resolution/create_table.sql
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/README
----------------------------------------------------------------------
diff --git a/testdata/compressed_formats/README b/testdata/compressed_formats/README
new file mode 100644
index 0000000..892cb80
--- /dev/null
+++ b/testdata/compressed_formats/README
@@ -0,0 +1,4 @@
+This folder contains a file necessary to test Impala's support for snappy block compressed
+file larger than 1GB. In particular, compressed_payload.snap is a string of 50176 bytes
+compressed using snappy. It's the building block for constructing a large snappy block
+compressed file. Please see test_compressed_formats.py for details.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/compressed_payload.snap
----------------------------------------------------------------------
diff --git a/testdata/compressed_formats/compressed_payload.snap b/testdata/compressed_formats/compressed_payload.snap
new file mode 100644
index 0000000..20ac4ff
Binary files /dev/null and b/testdata/compressed_formats/compressed_payload.snap differ
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 15f799c..c55c9e7 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -3,6 +3,8 @@
import os
import pytest
import random
+import string
+import struct
import subprocess
from os.path import join
from subprocess import call
@@ -144,19 +146,23 @@ class TestTableWriters(ImpalaTestSuite):
@pytest.mark.execute_serially
class TestLargeCompressedFile(ImpalaTestSuite):
- """ Tests that we gracefully handle when a compressed file in HDFS is larger
- than 1GB.
- This test creates a testing data file that is over 1GB and loads it to a table.
- Then verifies Impala will gracefully fail the query.
- TODO: Once IMPALA-1619 is fixed, modify the test to test > 2GB file."""
-
+ """
+ Tests that Impala handles compressed files in HDFS larger than 1GB.
+ This test creates a 2GB test data file and loads it into a table.
+ """
TABLE_NAME = "large_compressed_file"
TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
- """ Name the file with ".snappy" extension to let scanner treat it
- as a snappy compressed file."""
+ """
+ Name the file with ".snappy" extension to let scanner treat it as
+ a snappy block compressed file.
+ """
FILE_NAME = "largefile.snappy"
- LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
- MAX_FILE_SIZE = 1024 * 1024 * 1024
+ # Maximum uncompressed size of an outer block in a snappy block compressed file.
+ CHUNK_SIZE = 1024 * 1024 * 1024
+ # Limit the max file size to 2GB or too much memory may be needed when
+ # uncompressing the buffer. 2GB is sufficient to show that we support
+ # size beyond maximum 32-bit signed value.
+ MAX_FILE_SIZE = 2 * CHUNK_SIZE
@classmethod
def get_workload(self):
@@ -170,48 +176,65 @@ class TestLargeCompressedFile(ImpalaTestSuite):
pytest.skip("skipping if it's not exhaustive test.")
cls.TestMatrix.add_constraint(lambda v:
(v.get_value('table_format').file_format =='text' and
- v.get_value('table_format').compression_codec == 'none'))
+ v.get_value('table_format').compression_codec == 'snap'))
def teardown_method(self, method):
self.__drop_test_table()
- def __gen_char_or_num(self):
- return random.choice(self.LETTERS)
-
def __generate_file(self, file_name, file_size):
"""Generate file with random data and a specified size."""
- s = ''
- for j in range(1024):
- s = s + self.__gen_char_or_num()
- put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
- stdin=subprocess.PIPE, bufsize=-1)
- remain = file_size % 1024
- for i in range(int(file_size / 1024)):
- put.stdin.write(s)
- put.stdin.write(s[0:remain])
- put.stdin.close()
- put.wait()
+
+ # Read the payload compressed using snappy. The compressed payload
+ # is generated from a string of 50176 bytes.
+ payload_size = 50176
+ hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat",
+ "/test-warehouse/compressed_payload.snap"], stdout=subprocess.PIPE)
+ compressed_payload = hdfs_cat.stdout.read()
+ compressed_size = len(compressed_payload)
+ hdfs_cat.stdout.close()
+ hdfs_cat.wait()
+
+ # The layout of a snappy-block compressed file is one or more
+ # of the following nested structure which is called "chunk" in
+ # the code below:
+ #
+ # - <big endian 32-bit value encoding the uncompresed size>
+ # - one or more blocks of the following structure:
+ # - <big endian 32-bit value encoding the compressed size>
+ # - <raw bits compressed by snappy algorithm>
+
+ # Number of nested structures described above.
+ num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
+ # Number of compressed snappy blocks per chunk.
+ num_blocks_per_chunk = self.CHUNK_SIZE / (compressed_size + 4)
+ # Total uncompressed size of a nested structure.
+ total_chunk_size = num_blocks_per_chunk * payload_size
+
+ hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+ stdin=subprocess.PIPE, bufsize=-1)
+ for i in range(num_chunks):
+ hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
+ for j in range(num_blocks_per_chunk):
+ hdfs_put.stdin.write(struct.pack('>i', compressed_size))
+ hdfs_put.stdin.write(compressed_payload)
+ hdfs_put.stdin.close()
+ hdfs_put.wait()
def test_query_large_file(self, vector):
self.__create_test_table();
dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
- file_size = self.MAX_FILE_SIZE + 1
+ file_size = self.MAX_FILE_SIZE
self.__generate_file(dst_path, file_size)
self.client.execute("refresh %s" % self.TABLE_NAME)
- # Query the table and check for expected error.
- expected_error = 'Requested buffer size %dB > 1GB' % file_size
- try:
- result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
- assert False, "Query was expected to fail"
- except Exception as e:
- error_msg = str(e)
- assert expected_error in error_msg
+ # Query the table
+ result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
def __create_test_table(self):
self.__drop_test_table()
- self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'"
- % (self.TABLE_NAME, self.TABLE_LOCATION))
+ self.client.execute("CREATE TABLE %s (col string) " \
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
+ % (self.TABLE_NAME, self.TABLE_LOCATION))
def __drop_test_table(self):
self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)