You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/08 22:42:15 UTC

incubator-impala git commit: IMPALA-1619: Support 64-bit allocations.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 667a778af -> ed5ec6772


IMPALA-1619: Support 64-bit allocations.

This change extends MemPool, FreePool and StringBuffer to support
64-bit allocations, fixes a bug in decompressor and extends various
places in the code to support 64-bit allocation sizes. With this
change, the text scanner can now decompress compressed files larger
than 1GB.

Note that the UDF interfaces FunctionContext::Allocate() and
FunctionContext::Reallocate() still use 32-bit for the input
argument to avoid breaking compatibility.

In addition, the byte size of a tuple is still assumed to be
within 32-bit. If it needs to be upgraded to 64-bit, it will be
done in a separate change.

A new test has been added to test the decompression of a 2GB
snappy block compressed text file.

Change-Id: Ic1af1564953ac02aca2728646973199381c86e5f
Reviewed-on: http://gerrit.cloudera.org:8080/3575
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed5ec677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed5ec677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed5ec677

Branch: refs/heads/master
Commit: ed5ec6772fd24ad28901bc68f120c59597439cf2
Parents: 667a778
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jun 30 15:15:27 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Jul 8 15:42:09 2016 -0700

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser.cc            |  37 +++---
 be/src/exec/delimited-text-parser.h             |  40 ++++---
 be/src/exec/delimited-text-parser.inline.h      |  55 +++++----
 be/src/exec/hdfs-scanner.cc                     |   2 +-
 be/src/exec/hdfs-scanner.h                      |   3 +
 be/src/exec/hdfs-sequence-scanner.cc            |   8 +-
 be/src/exec/hdfs-text-scanner.cc                |  54 ++++-----
 be/src/exec/hdfs-text-scanner.h                 |   6 +-
 be/src/exec/scanner-context.cc                  |  16 +--
 be/src/runtime/buffered-block-mgr.cc            |  21 ++--
 be/src/runtime/collection-value-builder.h       |   2 +-
 be/src/runtime/free-pool-test.cc                |  27 ++++-
 be/src/runtime/free-pool.h                      |  19 +--
 be/src/runtime/mem-pool-test.cc                 |  49 +++++---
 be/src/runtime/mem-pool.h                       |   8 +-
 be/src/runtime/string-buffer-test.cc            |  20 ++--
 be/src/runtime/string-buffer.h                  | 116 +++++++++----------
 be/src/udf/udf-internal.h                       |   4 +-
 be/src/udf/udf.cc                               |   4 +-
 be/src/udf/udf.h                                |   2 +
 be/src/util/bit-util.h                          |  22 ++--
 be/src/util/codec.cc                            |  12 +-
 be/src/util/codec.h                             |   5 -
 be/src/util/decompress-test.cc                  |   4 +-
 be/src/util/decompress.cc                       |  67 +++--------
 common/thrift/generate_error_codes.py           |   3 +
 testdata/bin/create-load-data.sh                |   5 +
 testdata/compressed_formats/README              |   4 +
 .../compressed_formats/compressed_payload.snap  | Bin 0 -> 37270 bytes
 tests/query_test/test_compressed_formats.py     |  95 +++++++++------
 30 files changed, 370 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 950eae4..1a785ac 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -116,11 +116,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
 
   if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
     if (process_escapes_) {
-      ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
-          field_locations, num_tuples, num_fields, next_column_start);
+      RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr,
+          row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
     } else {
-      ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
-          field_locations, num_tuples, num_fields, next_column_start);
+      RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr,
+          row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
     }
   }
 
@@ -155,9 +155,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
         // If the row ended in \r\n then move to the \n
         ++*next_column_start;
       } else {
-        AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations);
-        FillColumns<false>(0, NULL, num_fields, field_locations);
+        RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations));
+        Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+        DCHECK(status.ok());
         column_idx_ = num_partition_keys_;
         row_end_locations[*num_tuples] = *byte_buffer_ptr;
         ++(*num_tuples);
@@ -171,8 +172,8 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
         return Status::OK();
       }
     } else if (new_col) {
-      AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-          next_column_start, num_fields, field_locations);
+      RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+          next_column_start, num_fields, field_locations));
     }
 
     --remaining_len;
@@ -183,9 +184,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
   // e.g. Sequence files.
   if (tuple_delim_ == '\0') {
     DCHECK_EQ(remaining_len, 0);
-    AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-        next_column_start, num_fields, field_locations);
-    FillColumns<false>(0, NULL, num_fields, field_locations);
+    RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+        next_column_start, num_fields, field_locations));
+    Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+    DCHECK(status.ok());
     column_idx_ = num_partition_keys_;
     ++(*num_tuples);
     unfinished_tuple_ = false;
@@ -193,11 +195,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
   return Status::OK();
 }
 
-// Find the first instance of the tuple delimiter.  This will
-// find the start of the first full tuple in buffer by looking for the end of
-// the previous tuple.
-int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) {
-  int tuple_start = 0;
+// Find the first instance of the tuple delimiter. This will find the start of the first
+// full tuple in buffer by looking for the end of the previous tuple.
+int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
+  int64_t tuple_start = 0;
   const char* buffer_start = buffer;
   bool found = false;
 
@@ -256,7 +257,7 @@ restart:
     // tuple break that are all escape characters, but that is
     // unlikely.
     int num_escape_chars = 0;
-    int before_tuple_end = tuple_start - 2;
+    int64_t before_tuple_end = tuple_start - 2;
     // TODO: If scan range is split between escape character and tuple delimiter,
     // before_tuple_end will be -1. Need to scan previous range for escape characters
     // in this case.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index f3e108c..d754c74 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -32,9 +32,9 @@ class DelimitedTextParser {
   ///   collection_item_delim: delimits collection items
   ///   escape_char: escape delimiters, make them part of the data.
   //
-  /// num_cols is the total number of columns including partition keys.
+  /// 'num_cols' is the total number of columns including partition keys.
   //
-  /// is_materialized_col should be initialized to an array of length 'num_cols', with
+  /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
   /// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
   /// Owned by caller.
   //
@@ -73,6 +73,8 @@ class DelimitedTextParser {
   ///   num_fields: Number of materialized fields parsed
   ///   next_column_start: pointer within file_buffer_ where the next field starts
   ///                      after the return from the call to ParseData
+  /// Returns an error status if any column exceeds the size limit.
+  /// See AddColumn() for details.
   Status ParseFieldLocations(int max_tuples, int64_t remaining_len,
       char** byte_buffer_ptr, char** row_end_locations,
       FieldLocation* field_locations,
@@ -84,9 +86,10 @@ class DelimitedTextParser {
   ///   col.
   /// - *num_fields returns the number of fields processed.
   /// This function is used to parse sequence file records which do not need to
-  /// parse for tuple delimiters.
+  /// parse for tuple delimiters. Returns an error status if any column exceeds the
+  /// size limit. See AddColumn() for details.
   template <bool process_escapes>
-  void ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
+  Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
       int* num_fields);
 
   /// FindFirstInstance returns the position after the first non-escaped tuple
@@ -94,7 +97,7 @@ class DelimitedTextParser {
   /// Used to find the start of a tuple if jumping into the middle of a text file.
   /// Also used to find the sync marker for Sequenced and RC files.
   /// If no tuple delimiter is found within the buffer, return -1;
-  int FindFirstInstance(const char* buffer, int len);
+  int64_t FindFirstInstance(const char* buffer, int64_t len);
 
   /// Will we return the current column to the query?
   /// Hive allows cols at the end of the table that are not in the schema.  We'll
@@ -104,17 +107,18 @@ class DelimitedTextParser {
   }
 
   /// Fill in columns missing at the end of the tuple.
-  /// len and last_column may contain the length and the pointer to the
+  /// 'len' and 'last_column' may contain the length and the pointer to the
   /// last column on which the file ended without a delimiter.
   /// Fills in the offsets and lengths in field_locations.
-  /// If parsing stopped on a delimiter and there is no last column then len will be  0.
+  /// If parsing stopped on a delimiter and there is no last column then length will be 0.
   /// Other columns beyond that are filled with 0 length fields.
-  /// num_fields points to an initialized count of fields and will incremented
+  /// 'num_fields' points to an initialized count of fields and will incremented
   /// by the number fields added.
-  /// field_locations will be updated with the start and length of the fields.
+  /// 'field_locations' will be updated with the start and length of the fields.
+  /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
   template <bool process_escapes>
-  void FillColumns(int len, char** last_column,
-                   int* num_fields, impala::FieldLocation* field_locations);
+  Status FillColumns(int64_t len, char** last_column, int* num_fields,
+      impala::FieldLocation* field_locations);
 
   /// Return true if we have not seen a tuple delimiter for the current tuple being
   /// parsed (i.e., the last byte read was not a tuple delimiter).
@@ -128,24 +132,28 @@ class DelimitedTextParser {
   /// Template parameter:
   ///   process_escapes -- if true the the column may have escape characters
   ///                      and the negative of the len will be stored.
-  ///   len: lenght of the current column.
+  ///   len: length of the current column. The length of a column must fit in a 32-bit
+  ///        signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
+  ///        it will be treated as an error.
   /// Input/Output:
   ///   next_column_start: Start of the current column, moved to the start of the next.
   ///   num_fields: current number of fields processed, updated to next field.
   /// Output:
   ///   field_locations: updated with start and length of current field.
+  /// Return an error status if 'len' exceeds the size limit specified above.
   template <bool process_escapes>
-  void AddColumn(int len, char** next_column_start, int* num_fields,
-                 FieldLocation* field_locations);
+  Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
+      FieldLocation* field_locations);
 
   /// Helper routine to parse delimited text using SSE instructions.
   /// Identical arguments as ParseFieldLocations.
   /// If the template argument, 'process_escapes' is true, this function will handle
   /// escapes, otherwise, it will assume the text is unescaped.  By using templates,
   /// we can special case the un-escaped path for better performance.  The unescaped
-  /// path is optimized away by the compiler.
+  /// path is optimized away by the compiler. Returns an error status if the length
+  /// of any column exceeds the size limit. See AddColumn() for details.
   template <bool process_escapes>
-  void ParseSse(int max_tuples, int64_t* remaining_len,
+  Status ParseSse(int max_tuples, int64_t* remaining_len,
       char** byte_buffer_ptr, char** row_end_locations_,
       FieldLocation* field_locations,
       int* num_tuples, int* num_fields, char** next_column_start);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index e96a763..28b5b4b 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -26,7 +26,7 @@ namespace impala {
 /// If the character at n is an escape character, then delimiters(tuple/field/escape
 /// characters) at n+1 don't count.
 inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
-                              uint16_t* delim_mask) {
+    uint16_t* delim_mask) {
   // Escape characters can escape escape characters.
   bool first_char_is_escape = *last_char_is_escape;
   bool escape_next = first_char_is_escape;
@@ -39,7 +39,7 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
 
   // Remember last character for the next iteration
   *last_char_is_escape = escape_mask &
-    SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
+      SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
 
   // Shift escape mask up one so they match at the same bit index as the tuple and
   // field mask (instead of being the character before) and set the correct first bit
@@ -50,35 +50,41 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
 }
 
 template <bool process_escapes>
-inline void DelimitedTextParser::AddColumn(int len, char** next_column_start,
+inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
     int* num_fields, FieldLocation* field_locations) {
+  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
+    return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
+  }
   if (ReturnCurrentColumn()) {
     // Found a column that needs to be parsed, write the start/len to 'field_locations'
     field_locations[*num_fields].start = *next_column_start;
+    int64_t field_len = len;
     if (process_escapes && current_column_has_escape_) {
-      field_locations[*num_fields].len = -len;
-    } else {
-      field_locations[*num_fields].len = len;
+      field_len = -len;
     }
+    field_locations[*num_fields].len = static_cast<int32_t>(field_len);
     ++(*num_fields);
   }
   if (process_escapes) current_column_has_escape_ = false;
   *next_column_start += len + 1;
   ++column_idx_;
+  return Status::OK();
 }
 
 template <bool process_escapes>
-void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
+inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
     int* num_fields, FieldLocation* field_locations) {
   // Fill in any columns missing from the end of the tuple.
   char* dummy = NULL;
   if (last_column == NULL) last_column = &dummy;
   while (column_idx_ < num_cols_) {
-    AddColumn<process_escapes>(len, last_column, num_fields, field_locations);
+    RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
+        num_fields, field_locations));
     // The rest of the columns will be null.
     last_column = &dummy;
     len = 0;
   }
+  return Status::OK();
 }
 
 /// SSE optimized raw text file parsing.  SSE4_2 added an instruction (with 3 modes) for
@@ -95,10 +101,9 @@ void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
 ///  Haystack = 'asdfghjklhjbdwwc' (the raw string)
 ///  Result   = '1010000000011001'
 template <bool process_escapes>
-inline void DelimitedTextParser::ParseSse(int max_tuples,
+inline Status DelimitedTextParser::ParseSse(int max_tuples,
     int64_t* remaining_len, char** byte_buffer_ptr,
-    char** row_end_locations,
-    FieldLocation* field_locations,
+    char** row_end_locations, FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
   DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
 
@@ -172,8 +177,8 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
       char* delim_ptr = *byte_buffer_ptr + n;
 
       if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
-        AddColumn<process_escapes>(delim_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations);
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations));
         continue;
       }
 
@@ -185,9 +190,10 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
           last_row_delim_offset_ = -1;
           continue;
         }
-        AddColumn<process_escapes>(delim_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations);
-        FillColumns<false>(0, NULL, num_fields, field_locations);
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations));
+        Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
+        DCHECK(status.ok());
         column_idx_ = num_partition_keys_;
         row_end_locations[*num_tuples] = delim_ptr;
         ++(*num_tuples);
@@ -200,7 +206,7 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
           // If the last character we processed was \r then set the offset to 0
           // so that we will use it at the beginning of the next batch.
           if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
-          return;
+          return Status::OK();
         }
       }
     }
@@ -214,11 +220,12 @@ inline void DelimitedTextParser::ParseSse(int max_tuples,
     *remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
     *byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
   }
+  return Status::OK();
 }
 
 /// Simplified version of ParseSSE which does not handle tuple delimiters.
 template <bool process_escapes>
-inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
     FieldLocation* field_locations, int* num_fields) {
   char* next_column_start = buffer;
   __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
@@ -263,8 +270,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
         // clear current bit
         delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-        AddColumn<process_escapes>(buffer + n - next_column_start,
-            &next_column_start, num_fields, field_locations);
+        RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
+            &next_column_start, num_fields, field_locations));
       }
 
       if (process_escapes) {
@@ -288,8 +295,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
 
     if (!last_char_is_escape_ &&
           (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
-      AddColumn<process_escapes>(buffer - next_column_start,
-          &next_column_start, num_fields, field_locations);
+      RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
+          &next_column_start, num_fields, field_locations));
     }
 
     --remaining_len;
@@ -298,8 +305,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b
 
   // Last column does not have a delimiter after it.  Add that column and also
   // pad with empty cols if the input is ragged.
-  FillColumns<process_escapes>(buffer - next_column_start,
-        &next_column_start, num_fields, field_locations);
+  return FillColumns<process_escapes>(buffer - next_column_start,
+      &next_column_start, num_fields, field_locations);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 78d4994..275956c 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -186,7 +186,7 @@ Status HdfsScanner::CommitRows(int num_rows) {
   DCHECK(batch_ != NULL);
   DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
   batch_->CommitRows(num_rows);
-  tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows;
+  tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
 
   // We need to pass the row batch to the scan node if there is too much memory attached,
   // which can happen if the query is very selective. We need to release memory even

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 7f804f1..9069451 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -58,6 +58,9 @@ struct FieldLocation {
   char* start;
   /// Encodes the length and whether or not this fields needs to be unescaped.
   /// If len < 0, then the field needs to be unescaped.
+  ///
+  /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue
+  /// and StringVal. All other types shouldn't be anywhere near this limit.
   int len;
 
   static const char* LLVM_CLASS_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 1460b1a..0cd000f 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -229,15 +229,15 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
   for (int i = 0; i < num_to_process; ++i) {
     int num_fields = 0;
     if (delimited_text_parser_->escape_char() == '\0') {
-      delimited_text_parser_->ParseSingleTuple<false>(
+      RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<false>(
           record_locations_[i].len,
           reinterpret_cast<char*>(record_locations_[i].record),
-          &field_locations_[field_location_offset], &num_fields);
+          &field_locations_[field_location_offset], &num_fields));
     } else {
-      delimited_text_parser_->ParseSingleTuple<true>(
+      RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<true>(
           record_locations_[i].len,
           reinterpret_cast<char*>(record_locations_[i].record),
-          &field_locations_[field_location_offset], &num_fields);
+          &field_locations_[field_location_offset], &num_fields));
     }
     DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
     field_location_offset += num_fields;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6e501cc..6cc308d 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -260,8 +260,8 @@ Status HdfsTextScanner::FinishScanRange() {
     // tuple.
     DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
     DCHECK(partial_tuple_empty_);
-    DCHECK(boundary_column_.Empty());
-    DCHECK(boundary_row_.Empty());
+    DCHECK(boundary_column_.IsEmpty());
+    DCHECK(boundary_row_.IsEmpty());
     return Status::OK();
   }
 
@@ -286,24 +286,24 @@ Status HdfsTextScanner::FinishScanRange() {
         ss << "Read failed while trying to finish scan range: " << stream_->filename()
            << ":" << stream_->file_offset() << endl << status.GetDetail();
         RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
-      } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
-          !boundary_row_.Empty() ||
+      } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
+          !boundary_row_.IsEmpty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&
               (!scan_node_->materialized_slots().empty() ||
                   scan_node_->num_materialized_partition_keys() > 0))) {
         // Missing columns or row delimiter at end of the file is ok, fill the row in.
-        char* col = boundary_column_.str().ptr;
+        char* col = boundary_column_.buffer();
         int num_fields = 0;
-        delimited_text_parser_->FillColumns<true>(boundary_column_.Size(),
-            &col, &num_fields, &field_locations_[0]);
+        RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
+            &col, &num_fields, &field_locations_[0]));
 
         MemPool* pool;
         TupleRow* tuple_row_mem;
         int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
         DCHECK_GE(max_tuples, 1);
         // Set variables for proper error outputting on boundary tuple
-        batch_start_ptr_ = boundary_row_.str().ptr;
-        row_end_locations_[0] = batch_start_ptr_ + boundary_row_.str().len;
+        batch_start_ptr_ = boundary_row_.buffer();
+        row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len();
         int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
         DCHECK_LE(num_tuples, 1);
         DCHECK_GE(num_tuples, 0);
@@ -371,7 +371,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
         (num_fields > 0 || *num_tuples > 0)) {
       // There can be one partial tuple which returned no more fields from this buffer.
       DCHECK_LE(*num_tuples, num_fields + 1);
-      if (!boundary_column_.Empty()) {
+      if (!boundary_column_.IsEmpty()) {
         RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
         boundary_column_.Clear();
       }
@@ -423,11 +423,11 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
     Status status;
     if (num_bytes > 0) {
       stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-                        &byte_buffer_read_size_, &status);
+          &byte_buffer_read_size_, &status);
     } else {
       DCHECK_EQ(num_bytes, 0);
       status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-                                  &byte_buffer_read_size_);
+          &byte_buffer_read_size_);
     }
     RETURN_IF_ERROR(status);
     *eosr = stream_->eosr();
@@ -555,7 +555,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   }
 
   // Need to read the entire file.
-  if (file_size < byte_buffer_read_size_) {
+  if (file_size > byte_buffer_read_size_) {
     stringstream ss;
     ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
        << "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
@@ -600,8 +600,8 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
-      int next_tuple_offset = 0;
-      int bytes_left = byte_buffer_read_size_;
+      int64_t next_tuple_offset = 0;
+      int64_t bytes_left = byte_buffer_read_size_;
       while (num_skipped_rows < num_rows_to_skip) {
         next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
           bytes_left);
@@ -610,7 +610,6 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
         bytes_left -= next_tuple_offset;
         ++num_skipped_rows;
       }
-
       if (next_tuple_offset != -1) *tuple_found = true;
     } while (!*tuple_found && !eosr);
 
@@ -681,7 +680,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 // codegen'd using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
 Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
-                                   const vector<ExprContext*>& conjunct_ctxs) {
+    const vector<ExprContext*>& conjunct_ctxs) {
   if (!node->runtime_state()->codegen_enabled()) return NULL;
   LlvmCodeGen* codegen;
   if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
@@ -717,9 +716,9 @@ void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) {
     row_start = row_end_locations_[row_idx - 1] + 1;
   }
 
-  if (!boundary_row_.Empty()) {
-    // Log the beginning of the line from the previous file buffer(s)
-    *ss << boundary_row_.str();
+  if (!boundary_row_.IsEmpty()) {
+    // Log the beginning of the line from the previous file buffer(s).
+    *ss << string(boundary_row_.buffer(), boundary_row_.len());
   }
   // Log the erroneous line (or the suffix of a line if !boundary_line.empty()).
   *ss << string(row_start, row_end - row_start);
@@ -792,7 +791,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
   // Write complete tuples.  The current field, if any, is at the start of a tuple.
   if (num_tuples > 0) {
     int max_added_tuples = (scan_node_->limit() == -1) ?
-          num_tuples : scan_node_->limit() - scan_node_->rows_returned();
+        num_tuples : scan_node_->limit() - scan_node_->rows_returned();
     int tuples_returned = 0;
     // Call jitted function if possible
     if (write_tuples_fn_ != NULL) {
@@ -836,31 +835,29 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
 Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
   bool needs_escape = data->len < 0;
   int copy_len = needs_escape ? -data->len : data->len;
-  int total_len = copy_len + boundary_column_.Size();
+  int64_t total_len = copy_len + boundary_column_.len();
   char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
   if (UNLIKELY(str_data == NULL)) {
     string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
         "$0 bytes.", total_len);
     return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
   }
-  memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
-  memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
+  memcpy(str_data, boundary_column_.buffer(), boundary_column_.len());
+  memcpy(str_data + boundary_column_.len(), data->start, copy_len);
   data->start = str_data;
   data->len = needs_escape ? -total_len : total_len;
   return Status::OK();
 }
 
-int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
+void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
     int num_fields, bool copy_strings) {
-  int next_line_offset = 0;
   for (int i = 0; i < num_fields; ++i) {
-    int need_escape = false;
+    bool need_escape = false;
     int len = fields[i].len;
     if (len < 0) {
       len = -len;
       need_escape = true;
     }
-    next_line_offset += (len + 1);
 
     const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
     if (!text_converter_->WriteSlot(desc, partial_tuple_,
@@ -870,5 +867,4 @@ int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
     }
     ++slot_idx_;
   }
-  return next_line_offset;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 997637d..dae104d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -156,9 +156,9 @@ class HdfsTextScanner : public HdfsScanner {
   int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples);
 
   /// Utility function to write out 'num_fields' to 'tuple_'.  This is used to parse
-  /// partial tuples.  Returns bytes processed.  If copy_strings is true, strings
-  /// from fields will be copied into the boundary pool.
-  int WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
+  /// partial tuples. If copy_strings is true, strings from fields will be copied into
+  /// the boundary pool.
+  void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
 
   /// Appends the current file and line to the RuntimeState's error log.
   /// row_idx is 0-based (in current batch) where the parse error occured.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 3c78d88..6a5081d 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -234,15 +234,9 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
       boundary_buffer_->Clear();
     }
   }
-  // Workaround IMPALA-1619. Fail the request if requested_len is more than 1GB.
-  // StringBuffer can only handle 32-bit allocations and StringBuffer::Append()
-  // will allocate twice the current buffer size, cause int overflow.
-  // TODO: Revert once IMPALA-1619 is fixed.
-  if (UNLIKELY(requested_len > StringValue::MAX_LENGTH)) {
-    LOG(WARNING) << "Requested buffer size " << requested_len << "B > 1GB."
-        << GetStackTrace();
-    return Status(Substitute("Requested buffer size $0B > 1GB", requested_len));
-  }
+
+  // Resize the buffer to the right size.
+  RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
 
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
     // We need to fetch more bytes. Copy the end of the current buffer and fetch the next
@@ -273,8 +267,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
   } else {
     RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
     boundary_buffer_bytes_left_ += num_bytes;
-    boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
-                           boundary_buffer_->Size() - boundary_buffer_bytes_left_;
+    boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
+        boundary_buffer_->len() - boundary_buffer_bytes_left_;
     io_buffer_bytes_left_ -= num_bytes;
     io_buffer_pos_ += num_bytes;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 8265ea2..a9e95cd 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -303,22 +303,14 @@ bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers)
 }
 
 bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
-  // Workaround IMPALA-1619. Return immediately if the allocation size will cause
-  // an arithmetic overflow.
-  if (UNLIKELY(size >= (1LL << 31))) {
-    // IMPALA-3238: don't repeatedly log warning when bumping up against this limit for
-    // large hash tables.
-    if (!client->logged_large_allocation_warning_) {
-      LOG(WARNING) << "Trying to allocate memory >=2GB (" << size << ")B."
-                   << GetStackTrace();
-      client->logged_large_allocation_warning_ = true;
-    }
+  int64_t buffers_needed = BitUtil::Ceil(size, max_block_size());
+  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) {
+    VLOG_QUERY << "Trying to consume " << size << " which is out of range.";
     return false;
   }
-  int buffers_needed = BitUtil::Ceil(size, max_block_size());
   DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
-  unique_lock<mutex> lock(lock_);
 
+  unique_lock<mutex> lock(lock_);
   if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
     // For small allocations (less than a block size), just let the allocation through.
     client->tracker_->ConsumeLocal(size, client->query_tracker_);
@@ -593,7 +585,7 @@ int BufferedBlockMgr::num_pinned_buffers(Client* client) const {
 }
 
 int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const {
-  return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
+  return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
 }
 
 MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
@@ -1017,7 +1009,8 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
     //  1. In the unpinned list. The buffer will not be in the free list.
     //  2. in_write_ == true. The buffer will not be in the free list.
     //  3. The buffer is free, but hasn't yet been reassigned to a different block.
-    DCHECK_EQ(block->buffer_desc_->len, max_block_size()) << "Non-I/O blocks are always pinned";
+    DCHECK_EQ(block->buffer_desc_->len, max_block_size())
+        << "Non-I/O blocks are always pinned";
     DCHECK(unpinned_blocks_.Contains(block) ||
            block->in_write_ ||
            free_io_buffers_.Contains(block->buffer_desc_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index c57b546..4065b80 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -32,7 +32,7 @@ class CollectionValueBuilder {
 
   CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc,
       MemPool* pool, RuntimeState* state,
-      int initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
+      int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
     : coll_value_(coll_value),
       tuple_desc_(tuple_desc),
       pool_(pool),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool-test.cc b/be/src/runtime/free-pool-test.cc
index 02d245a..c8ff93d 100644
--- a/be/src/runtime/free-pool-test.cc
+++ b/be/src/runtime/free-pool-test.cc
@@ -85,6 +85,22 @@ TEST(FreePoolTest, Basic) {
   EXPECT_EQ(mem_pool.total_allocated_bytes(), 64);
 
   mem_pool.FreeAll();
+
+  // Try making allocations larger than 1GB.
+  uint8_t* p5 = pool.Allocate(1LL << 32);
+  EXPECT_TRUE(p5 != NULL);
+  for (int64_t i = 0; i < (1LL << 32); i += (1 << 29)) {
+    *(p5 + i) = i;
+  }
+  EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
+
+  // Test zero-byte allocation.
+  p5 = pool.Allocate(0);
+  EXPECT_TRUE(p5 != NULL);
+  EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
+  pool.Free(p5);
+
+  mem_pool.FreeAll();
 }
 
 // In this test we make two allocations at increasing sizes and then we
@@ -96,13 +112,13 @@ TEST(FreePoolTest, Loop) {
   MemPool mem_pool(&tracker);
   FreePool pool(&mem_pool);
 
-  map<int, pair<uint8_t*, uint8_t*> > primed_allocations;
-  vector<int> allocation_sizes;
+  map<int64_t, pair<uint8_t*, uint8_t*> > primed_allocations;
+  vector<int64_t> allocation_sizes;
 
   int64_t expected_pool_size = 0;
 
   // Pick a non-power of 2 to exercise more code.
-  for (int size = 3; size < 1024 * 1024 * 1024; size *= 3) {
+  for (int64_t size = 5; size < 6LL * 1024 * 1024 * 1024; size *= 5) {
     uint8_t* p1 = pool.Allocate(size);
     uint8_t* p2 = pool.Allocate(size);
     EXPECT_TRUE(p1 != NULL);
@@ -163,6 +179,11 @@ TEST(FreePoolTest, ReAlloc) {
   ptr = pool.Allocate(600);
   EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8);
 
+  // Try allocation larger than 1GB.
+  uint8_t* ptr4 = pool.Reallocate(ptr3, 1LL << 32);
+  EXPECT_TRUE(ptr3 != ptr4);
+  EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8 + (1LL << 32) + 8);
+
   mem_pool.FreeAll();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h
index dfabaf0..90df749 100644
--- a/be/src/runtime/free-pool.h
+++ b/be/src/runtime/free-pool.h
@@ -52,8 +52,9 @@ class FreePool {
     memset(&lists_, 0, sizeof(lists_));
   }
 
-  /// Allocates a buffer of size.
-  uint8_t* Allocate(int size) {
+  /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes.
+  uint8_t* Allocate(int64_t size) {
+    DCHECK_GE(size, 0);
 #ifndef NDEBUG
     static int32_t alloc_counts = 0;
     if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -64,16 +65,15 @@ class FreePool {
     ++net_allocations_;
     if (FLAGS_disable_mem_pools) return reinterpret_cast<uint8_t*>(malloc(size));
 
-    /// This is the typical malloc behavior. NULL is reserved for failures.
-    if (size == 0) return reinterpret_cast<uint8_t*>(0x1);
+    /// Return a non-NULL dummy pointer. NULL is reserved for failures.
+    if (UNLIKELY(size == 0)) return mem_pool_->EmptyAllocPtr();
 
     int free_list_idx = Bits::Log2Ceiling64(size);
     DCHECK_LT(free_list_idx, NUM_LISTS);
-
     FreeListNode* allocation = lists_[free_list_idx].next;
     if (allocation == NULL) {
       // There wasn't an existing allocation of the right size, allocate a new one.
-      size = 1 << free_list_idx;
+      size = 1LL << free_list_idx;
       allocation = reinterpret_cast<FreeListNode*>(
           mem_pool_->Allocate(size + sizeof(FreeListNode)));
       if (UNLIKELY(allocation == NULL)) {
@@ -97,7 +97,7 @@ class FreePool {
       free(ptr);
       return;
     }
-    if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return;
+    if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return;
     FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
     FreeListNode* list = node->list;
 #ifndef NDEBUG
@@ -114,7 +114,7 @@ class FreePool {
   ///
   /// NULL will be returned on allocation failure. It's the caller's responsibility to
   /// free the memory buffer pointed to by "ptr" in this case.
-  uint8_t* Reallocate(uint8_t* ptr, int size) {
+  uint8_t* Reallocate(uint8_t* ptr, int64_t size) {
 #ifndef NDEBUG
     static int32_t alloc_counts = 0;
     if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -125,13 +125,14 @@ class FreePool {
     if (FLAGS_disable_mem_pools) {
       return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size));
     }
-    if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return Allocate(size);
+    if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size);
     FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
     FreeListNode* list = node->list;
 #ifndef NDEBUG
     CheckValidAllocation(list, ptr);
 #endif
     int bucket_idx = (list - &lists_[0]);
+    DCHECK_LT(bucket_idx, NUM_LISTS);
     // This is the actual size of ptr.
     int allocation_size = 1 << bucket_idx;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index ab6bd5a..121098c 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -21,6 +21,9 @@
 
 #include "common/names.h"
 
+// Maximum allocation size which exceeds 32-bit.
+#define LARGE_ALLOC_SIZE (1LL << 32)
+
 namespace impala {
 
 // Utility class to call private functions on MemPool.
@@ -125,6 +128,11 @@ TEST(MemPoolTest, Basic) {
     p2.FreeAll();
     p3.FreeAll();
   }
+
+  // Test zero byte allocation.
+  uint8_t* ptr = p.Allocate(0);
+  EXPECT_TRUE(ptr != NULL);
+  EXPECT_EQ(0, p.GetTotalChunkSizes());
 }
 
 // Test that we can keep an allocated chunk and a free chunk.
@@ -192,6 +200,22 @@ TEST(MemPoolTest, ReturnPartial) {
     EXPECT_EQ(2, ptr[i]);
   }
 
+  // Try ReturnPartialAllocations() with 64-bit values.
+  uint8_t* ptr4 = p.Allocate(LARGE_ALLOC_SIZE + 512);
+  EXPECT_EQ(1024 + LARGE_ALLOC_SIZE + 512, p.total_allocated_bytes());
+  memset(ptr4, 3, 512 * 2);
+  p.ReturnPartialAllocation(LARGE_ALLOC_SIZE);
+  uint8_t* ptr5 = p.Allocate(512);
+  EXPECT_TRUE(ptr5 == ptr4 + 512);
+  memset(ptr5, 4, 512);
+
+  for (int i = 0; i < 512; ++i) {
+    EXPECT_EQ(3, ptr4[i]);
+  }
+  for (int i = 512; i < 512 * 2; ++i) {
+    EXPECT_EQ(4, ptr4[i]);
+  }
+
   p.FreeAll();
 }
 
@@ -252,51 +276,50 @@ TEST(MemPoolTest, Limits) {
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
 
   // Try To allocate 20 bytes, this should succeed. TryAllocate() should leave the
-  // pool in a functional state..
+  // pool in a functional state.
   result = p2->TryAllocate(20);
   ASSERT_TRUE(result != NULL);
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
 
-
   p2->FreeAll();
   delete p2;
 }
 
 TEST(MemPoolTest, MaxAllocation) {
-  int64_t int_max_rounded = BitUtil::RoundUp(INT_MAX, 8);
+  int64_t int_max_rounded = BitUtil::RoundUp(LARGE_ALLOC_SIZE, 8);
 
-  // Allocate a single INT_MAX chunk
+  // Allocate a single LARGE_ALLOC_SIZE chunk
   MemTracker tracker;
   MemPool p1(&tracker);
-  uint8_t* ptr = p1.Allocate(INT_MAX);
+  uint8_t* ptr = p1.Allocate(LARGE_ALLOC_SIZE);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded, p1.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded, p1.total_allocated_bytes());
   p1.FreeAll();
 
-  // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an INT_MAX chunk
+  // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an LARGE_ALLOC_SIZE chunk
   MemPool p2(&tracker);
   p2.Allocate(8);
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, p2.GetTotalChunkSizes());
   EXPECT_EQ(8, p2.total_allocated_bytes());
-  ptr = p2.Allocate(INT_MAX);
+  ptr = p2.Allocate(LARGE_ALLOC_SIZE);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE + int_max_rounded,
       p2.GetTotalChunkSizes());
   EXPECT_EQ(8LL + int_max_rounded, p2.total_allocated_bytes());
   p2.FreeAll();
 
-  // Allocate three INT_MAX chunks followed by a small chunk followed by another INT_MAX
-  // chunk
+  // Allocate three LARGE_ALLOC_SIZE chunks followed by a small chunk
+  // followed by another LARGE_ALLOC_SIZE chunk.
   MemPool p3(&tracker);
-  p3.Allocate(INT_MAX);
+  p3.Allocate(LARGE_ALLOC_SIZE);
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(INT_MAX);
+  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 2, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 2, p3.total_allocated_bytes());
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(INT_MAX);
+  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 3, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 3, p3.total_allocated_bytes());
@@ -308,7 +331,7 @@ TEST(MemPoolTest, MaxAllocation) {
   EXPECT_EQ(int_max_rounded * 3 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 3 + 8, p3.total_allocated_bytes());
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(INT_MAX);
+  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 4 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 4 + 8, p3.total_allocated_bytes());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 15325c0..e1c4d2b 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -101,7 +101,7 @@ class MemPool {
   /// Returns 'byte_size' to the current chunk back to the mem pool. This can
   /// only be used to return either all or part of the previous allocation returned
   /// by Allocate().
-  void ReturnPartialAllocation(int byte_size) {
+  void ReturnPartialAllocation(int64_t byte_size) {
     DCHECK_GE(byte_size, 0);
     DCHECK(current_chunk_idx_ != -1);
     ChunkInfo& info = chunks_[current_chunk_idx_];
@@ -110,6 +110,11 @@ class MemPool {
     total_allocated_bytes_ -= byte_size;
   }
 
+  /// Return a dummy pointer for zero-length allocations.
+  static uint8_t* EmptyAllocPtr() {
+    return reinterpret_cast<uint8_t*>(&zero_length_region_);
+  }
+
   /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
   void Clear();
 
@@ -208,6 +213,7 @@ class MemPool {
 
   template <bool CHECK_LIMIT_FIRST>
   uint8_t* Allocate(int64_t size) noexcept {
+    DCHECK_GE(size, 0);
     if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t *>(&zero_length_region_);
 
     int64_t num_bytes = BitUtil::RoundUp(size, 8);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index 95d2e41..2387d60 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -24,10 +24,10 @@
 namespace impala {
 
 void ValidateString(const string& std_str, const StringBuffer& str) {
-  EXPECT_EQ(std_str.empty(), str.Empty());
-  EXPECT_EQ((int)std_str.size(), str.Size());
+  EXPECT_EQ(std_str.empty(), str.IsEmpty());
+  EXPECT_EQ(static_cast<int64_t>(std_str.size()), str.len());
   if (std_str.size() > 0) {
-    EXPECT_EQ(strncmp(std_str.c_str(), str.str().ptr, std_str.size()), 0);
+    EXPECT_EQ(strncmp(std_str.c_str(), str.buffer(), std_str.size()), 0);
   }
 }
 
@@ -55,11 +55,6 @@ TEST(StringBufferTest, Basic) {
   str.Append("World", strlen("World"));
   ValidateString(std_str, str);
 
-  // Assign
-  std_str.assign("foo");
-  str.Assign("foo", strlen("foo"));
-  ValidateString(std_str, str);
-
   // Clear
   std_str.clear();
   str.Clear();
@@ -72,23 +67,22 @@ TEST(StringBufferTest, Basic) {
 }
 
 TEST(StringBufferTest, AppendBoundary) {
-  // Test StringBuffer::Append() up to 1GB is ok
-  // TODO: Once IMPALA-1619 is fixed, we should change the test to verify
-  // append over 2GB string is supported.
+  // Test StringBuffer::Append() works beyond 1GB.
   MemTracker tracker;
   MemPool pool(&tracker);
   StringBuffer str(&pool);
   string std_str;
 
   const int64_t chunk_size = 8 * 1024 * 1024;
+  const int64_t max_data_size = 1LL << 32;
   std_str.resize(chunk_size, 'a');
   int64_t data_size = 0;
-  while (data_size + chunk_size <= StringValue::MAX_LENGTH) {
+  while (data_size + chunk_size <= max_data_size) {
     str.Append(std_str.c_str(), chunk_size);
     data_size += chunk_size;
   }
   EXPECT_EQ(str.buffer_size(), data_size);
-  std_str.resize(StringValue::MAX_LENGTH, 'a');
+  std_str.resize(max_data_size, 'a');
   ValidateString(std_str, str);
 
   pool.FreeAll();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index ec59806..3725181 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -27,99 +27,87 @@ namespace impala {
 
 /// Dynamic-sizable string (similar to std::string) but without as many
 /// copies and allocations.
-/// StringBuffer wraps a StringValue object with a pool and memory buffer length.
-/// It supports a subset of the std::string functionality but will only allocate
-/// bigger string buffers as necessary.  std::string tries to be immutable and will
-/// reallocate very often.  std::string should be avoided in all hot paths.
+/// StringBuffer is a buffer of char allocated from 'pool'. Current usage and size of the
+/// buffer are tracked in 'len_' and 'buffer_size_' respectively. It supports a subset of
+/// the std::string functionality but will only allocate bigger string buffers as
+/// necessary. std::string tries to be immutable and will reallocate very often.
+/// std::string should be avoided in all hot paths.
 class StringBuffer {
  public:
   /// C'tor for StringBuffer.  Memory backing the string will be allocated from
   /// the pool as necessary. Can optionally be initialized from a StringValue.
   StringBuffer(MemPool* pool, StringValue* str = NULL)
-      : pool_(pool), buffer_size_(0) {
+      : pool_(pool), buffer_(NULL), len_(0), buffer_size_(0) {
     DCHECK(pool_ != NULL);
     if (str != NULL) {
-      string_value_ = *str;
-      buffer_size_ = str->len;
+      buffer_ = str->ptr;
+      len_ = buffer_size_ = str->len;
     }
   }
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
-  /// Return error status if memory limit is exceeded.
-  Status Append(const char* str, int len) {
-    int new_len = len + string_value_.len;
+  Status Append(const char* str, int64_t str_len) {
+    int64_t new_len = len_ + str_len;
     if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
-    memcpy(string_value_.ptr + string_value_.len, str, len);
-    string_value_.len = new_len;
+    memcpy(buffer_ + len_, str, str_len);
+    len_ += str_len;
     return Status::OK();
   }
 
-  /// TODO: switch everything to uint8_t?
-  Status Append(const uint8_t* str, int len) {
-    return Append(reinterpret_cast<const char*>(str), len);
+  /// Wrapper around append() for input type 'uint8_t'.
+  Status Append(const uint8_t* str, int64_t str_len) {
+    return Append(reinterpret_cast<const char*>(str), str_len);
   }
 
-  /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
-  Status Assign(const char* str, int len) {
-    Clear();
-    return Append(str, len);
-  }
-
-  /// Clear the underlying StringValue.  The allocated buffer can be reused.
-  void Clear() {
-    string_value_.len = 0;
-  }
+  /// Clear the underlying StringValue. The allocated buffer can be reused.
+  void Clear() { len_ = 0; }
 
-  /// Clears the underlying buffer and StringValue
+  /// Reset the usage and size of the buffer. Note that the allocated buffer is
+  /// retained but cannot be reused.
   void Reset() {
-    string_value_.len = 0;
+    len_ = 0;
     buffer_size_ = 0;
+    buffer_ = NULL;
   }
 
-  /// Returns whether the current string is empty
-  bool Empty() const {
-    return string_value_.len == 0;
+  /// Returns true if no byte is consumed in the buffer.
+  bool IsEmpty() const { return len_ == 0; }
+
+  /// Grows the buffer to be at least 'new_size', copying over the previous data
+  /// into the new buffer. The old buffer is not freed. Return an error status if
+  /// growing the buffer will exceed memory limit.
+  Status GrowBuffer(int64_t new_size) {
+    if (LIKELY(new_size > buffer_size_)) {
+      int64_t old_size = buffer_size_;
+      buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);
+      char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+      if (UNLIKELY(new_buffer == NULL)) {
+        string details = Substitute("StringBuffer failed to grow buffer from $0 "
+            "to $1 bytes.", old_size, buffer_size_);
+        return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+      }
+      if (LIKELY(len_ > 0)) memcpy(new_buffer, buffer_, len_);
+      buffer_ = new_buffer;
+    }
+    return Status::OK();
   }
 
-  /// Returns the length of the current string
-  int Size() const {
-    return string_value_.len;
-  }
+  /// Returns the number of bytes consumed in the buffer.
+  int64_t len() const { return len_; }
 
-  /// Returns the underlying StringValue
-  const StringValue& str() const {
-    return string_value_;
-  }
+  /// Returns the pointer to the buffer. Note that it's the caller's responsibility
+  /// to not retain the pointer to 'buffer_' across call to Append() as the buffer_
+  /// may be relocated in Append().
+  char* buffer() const { return buffer_; }
 
-  /// Returns the buffer size
-  int buffer_size() const {
-    return buffer_size_;
-  }
+  /// Returns the size of the buffer.
+  int64_t buffer_size() const { return buffer_size_; }
 
  private:
-  /// Grows the buffer backing the string to be at least new_size, copying over the
-  /// previous string data into the new buffer. Return error status if memory limit
-  /// is exceeded.
-  Status GrowBuffer(int new_len) {
-    // TODO: Release/reuse old buffers somehow
-    buffer_size_ = std::max(buffer_size_ * 2, new_len);
-    DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
-    char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
-    if (UNLIKELY(new_buffer == NULL)) {
-      string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
-          buffer_size_);
-      return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
-    }
-    if (LIKELY(string_value_.len > 0)) {
-      memcpy(new_buffer, string_value_.ptr, string_value_.len);
-    }
-    string_value_.ptr = new_buffer;
-    return Status::OK();
-  }
-
   MemPool* pool_;
-  StringValue string_value_;
-  int buffer_size_;
+  char* buffer_;
+  int64_t len_;         // number of bytes consumed in the buffer.
+  int64_t buffer_size_; // size of the buffer.
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 599e42f..1996838 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -79,7 +79,7 @@ class FunctionContextImpl {
   /// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear.
   /// For UDFs, the allocations can be freed at the row level.
   /// TODO: free them at the batch level and save some copies?
-  uint8_t* AllocateLocal(int byte_size) noexcept;
+  uint8_t* AllocateLocal(int64_t byte_size) noexcept;
 
   /// Frees all allocations returned by AllocateLocal().
   void FreeLocalAllocations() noexcept;
@@ -121,7 +121,7 @@ class FunctionContextImpl {
   /// if necessary.
   ///
   /// Return false if 'buf' is null; returns true otherwise.
-  bool CheckAllocResult(const char* fn_name, uint8_t* buf, int byte_size);
+  bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size);
 
   /// Preallocated buffer for storing varargs (if the function has any). Allocated and
   /// owned by this object, but populated by an Expr function.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 4cadb63..ff81c9c 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -267,7 +267,7 @@ const char* FunctionContext::error_msg() const {
 }
 
 inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
-    uint8_t* buf, int byte_size) {
+    uint8_t* buf, int64_t byte_size) {
   if (UNLIKELY(buf == NULL)) {
     stringstream ss;
     ss << string(fn_name) << "() failed to allocate " << byte_size << " bytes.";
@@ -416,7 +416,7 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) {
   }
 }
 
-uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) noexcept {
+uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept {
   assert(!closed_);
   if (byte_size == 0) return NULL;
   uint8_t* buffer = pool_->Allocate(byte_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 1b1cdab..210c855 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -140,6 +140,7 @@ class FunctionContext {
   /// The UDF/UDA is responsible for calling Free() on all buffers returned by Allocate().
   /// If Allocate() fails or causes the memory limit to be exceeded, the error will be
   /// set in this object causing the query to fail.
+  /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
   uint8_t* Allocate(int byte_size) noexcept;
 
   /// Wrapper around Allocate() to allocate a buffer of the given type "T".
@@ -155,6 +156,7 @@ class FunctionContext {
   /// memory limit to be exceeded, the error will be set in this object.
   ///
   /// This should be used for buffers that constantly get appended to.
+  /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
   uint8_t* Reallocate(uint8_t* ptr, int byte_size) noexcept;
 
   /// Frees a buffer returned from Allocate() or Reallocate()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index cae8212..14553ae 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -23,6 +23,7 @@
 #endif
 
 #include <boost/type_traits/make_unsigned.hpp>
+#include <limits>
 
 #include "common/compiler-util.h"
 #include "util/cpu-info.h"
@@ -203,31 +204,36 @@ class BitUtil {
   static inline uint16_t FromBigEndian(uint16_t val) { return val; }
 #endif
 
-  // Logical right shift for signed integer types
-  // This is needed because the C >> operator does arithmetic right shift
-  // Negative shift amounts lead to undefined behavior
+  /// Returns true if 'value' is a non-negative 32-bit integer.
+  static inline bool IsNonNegative32Bit(int64_t value) {
+    return static_cast<uint64_t>(value) <= std::numeric_limits<int32_t>::max();
+  }
+
+  /// Logical right shift for signed integer types
+  /// This is needed because the C >> operator does arithmetic right shift
+  /// Negative shift amounts lead to undefined behavior
   template<typename T>
   static T ShiftRightLogical(T v, int shift) {
     // Conversion to unsigned ensures most significant bits always filled with 0's
     return static_cast<typename make_unsigned<T>::type>(v) >> shift;
   }
 
-  // Get an specific bit of a numeric type
+  /// Get an specific bit of a numeric type
   template<typename T>
   static inline int8_t GetBit(T v, int bitpos) {
     T masked = v & (static_cast<T>(0x1) << bitpos);
     return static_cast<int8_t>(ShiftRightLogical(masked, bitpos));
   }
 
-  // Set a specific bit to 1
-  // Behavior when bitpos is negative is undefined
+  /// Set a specific bit to 1
+  /// Behavior when bitpos is negative is undefined
   template<typename T>
   static T SetBit(T v, int bitpos) {
     return v | (static_cast<T>(0x1) << bitpos);
   }
 
-  // Set a specific bit to 0
-  // Behavior when bitpos is negative is undefined
+  /// Set a specific bit to 0
+  /// Behavior when bitpos is negative is undefined
   template<typename T>
   static T UnsetBit(T v, int bitpos) {
     return v & ~(static_cast<T>(0x1) << bitpos);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 6863d1f..2ee7415 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -182,15 +182,13 @@ Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
     const uint8_t* input, int* output_length, uint8_t** output) {
   int64_t input_len64 = input_length;
   int64_t output_len64 = *output_length;
-  RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64,
-                               output));
-  // Check whether we are going to have an overflow if we are going to cast from int64_t
-  // to int.
-  // TODO: Is there a faster way to do this check?
-  if (UNLIKELY(output_len64 > numeric_limits<int>::max())) {
+  RETURN_IF_ERROR(
+      ProcessBlock(output_preallocated, input_len64, input, &output_len64, output));
+  // Buffer size should be between [0, (2^31 - 1)] bytes.
+  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) {
     return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
         output_len64));;
   }
-  *output_length = static_cast<int32_t>(output_len64);
+  *output_length = static_cast<int>(output_len64);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index a337983..563a3b9 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -150,11 +150,6 @@ class Codec {
 
   bool supports_streaming() const { return supports_streaming_; }
 
-  /// Largest block we will compress/decompress: 2GB.
-  /// We are dealing with compressed blocks that are never this big but we want to guard
-  /// against a corrupt file that has the block length as some large number.
-  static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1;
-
  protected:
   /// Create a compression operator
   /// Inputs:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 3a32756..7a94db0 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -121,10 +121,8 @@ class DecompressorTest : public ::testing::Test {
       EXPECT_GE(max_compressed_length, 0);
       uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
       compressed_length = max_compressed_length;
-
-
       EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
-                                           &compressed));
+          &compressed));
     }
 
     output_len = decompressor->MaxOutputLen(compressed_length, compressed);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 8a2ea74..900f891 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -148,9 +148,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     if (!reuse_buffer_ || out_buffer_ == NULL) {
       // guess that we will need 2x the input length.
       buffer_length_ = input_length * 2;
-      if (buffer_length_ > MAX_BLOCK_SIZE) {
-        return Status("Decompressor: block size is too big");
-      }
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -204,11 +201,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     // User didn't supply the buffer, double the buffer and try again.
     temp_memory_pool_->Clear();
     buffer_length_ *= 2;
-    if (buffer_length_ > MAX_BLOCK_SIZE) {
-      stringstream ss;
-      ss << "GzipDecompressor: block size is too big: " << buffer_length_;
-      return Status(ss.str());
-    }
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
     if (UNLIKELY(out_buffer_ == NULL)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -273,9 +265,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
   } else if (!reuse_buffer_ || out_buffer_ == NULL) {
     // guess that we will need 2x the input length.
     buffer_length_ = input_length * 2;
-    if (buffer_length_ > MAX_BLOCK_SIZE) {
-      return Status("Decompressor: block size is too big");
-    }
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
     if (UNLIKELY(out_buffer_ == NULL)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -295,9 +284,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
       DCHECK(!output_preallocated);
       temp_memory_pool_->Clear();
       buffer_length_ = buffer_length_ * 2;
-      if (buffer_length_ > MAX_BLOCK_SIZE) {
-        return Status("Decompressor: block size is too big");
-      }
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -415,17 +401,18 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
 }
 
 // Hadoop uses a block compression scheme on top of snappy.  As per the hadoop docs
-// the input is split into blocks.  Each block "contains the uncompressed length for
-// the block followed by one of more length-prefixed blocks of compressed data."
+// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split
+// into blocks.  Each block "contains the uncompressed length for the block followed
+// by one of more length-prefixed blocks of compressed data."
 // This is essentially blocks of blocks.
 // The outer block consists of:
-//   - 4 byte little endian uncompressed_size
+//   - 4 byte big endian uncompressed_size
 //   < inner blocks >
 //   ... repeated until input_len is consumed ..
 // The inner blocks have:
-//   - 4-byte little endian compressed_size
+//   - 4-byte big endian compressed_size
 //   < snappy compressed block >
-//   - 4-byte little endian compressed_size
+//   - 4-byte big endian compressed_size
 //   < snappy compressed block >
 //   ... repeated until uncompressed_size from outer block is consumed ...
 
@@ -443,15 +430,6 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     input += sizeof(uint32_t);
     input_len -= sizeof(uint32_t);
 
-    if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) {
-      if (uncompressed_total_len == 0) {
-        // TODO: is this check really robust?
-        return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
-            uncompressed_block_len);
-      }
-      break;
-    }
-
     if (!size_only) {
       int64_t remaining_output_size = *output_len - uncompressed_total_len;
       DCHECK_GE(remaining_output_size, uncompressed_block_len);
@@ -464,29 +442,23 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       input_len -= sizeof(uint32_t);
 
       if (compressed_len == 0 || compressed_len > input_len) {
-        if (uncompressed_total_len == 0) {
-          return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
-        }
-        input_len = 0;
-        break;
+        *output_len = 0;
+        return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
       }
 
       // Read how big the output will be.
       size_t uncompressed_len;
       if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
-            input_len, &uncompressed_len)) {
-        if (uncompressed_total_len == 0) {
-          return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
-        }
-        input_len = 0;
-        break;
+              compressed_len, &uncompressed_len)) {
+        *output_len = 0;
+        return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
       }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {
         // Decompress this snappy block
         if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
-              compressed_len, output)) {
+                compressed_len, output)) {
           return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
         }
         output += uncompressed_len;
@@ -526,14 +498,6 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
     *output = out_buffer_;
   }
 
-  if (*output_len > MAX_BLOCK_SIZE) {
-    // TODO: is this check really robust?
-    stringstream ss;
-    ss << "Decompressor: block size is too big.  Data is likely corrupt. "
-       << "Size: " << *output_len;
-    return Status(ss.str());
-  }
-
   char* out_ptr = reinterpret_cast<char*>(*output);
   RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
   return Status::OK();
@@ -547,7 +511,7 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
   DCHECK(input != NULL);
   size_t result;
   if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
-           input_len, &result)) {
+          input_len, &result)) {
     return -1;
   }
   return result;
@@ -560,9 +524,6 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
     if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed");
     if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
       buffer_length_ = uncompressed_length;
-      if (buffer_length_ > MAX_BLOCK_SIZE) {
-        return Status("Decompressor: block size is too big");
-      }
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
@@ -576,7 +537,7 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
   }
 
   if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
-           static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
+          static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
     return Status("Snappy: RawUncompress failed");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 7815233..26ea78d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -274,6 +274,9 @@ error_codes = (
 
   ("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
    "data of type $1: $2"),
+
+  ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
+   "supported length of 2147483647 bytes.")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 53cdefa..dd6f8ad 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -294,6 +294,11 @@ function load-custom-data {
   hadoop fs -put -f ${IMPALA_HOME}/testdata/tinytable_seq_snap/tinytable_seq_snap_header_only \
                     /test-warehouse/tinytable_seq_snap
 
+  # IMPALA-1619: payload compressed with snappy used for constructing large snappy block
+  # compressed file
+  hadoop fs -put -f ${IMPALA_HOME}/testdata/compressed_formats/compressed_payload.snap \
+                    /test-warehouse/compressed_payload.snap
+
   beeline -n $USER -u "${JDBC_URL}" -f\
     ${IMPALA_HOME}/testdata/avro_schema_resolution/create_table.sql
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/README
----------------------------------------------------------------------
diff --git a/testdata/compressed_formats/README b/testdata/compressed_formats/README
new file mode 100644
index 0000000..892cb80
--- /dev/null
+++ b/testdata/compressed_formats/README
@@ -0,0 +1,4 @@
+This folder contains a file necessary to test Impala's support for snappy block compressed
+file larger than 1GB. In particular, compressed_payload.snap is a string of 50176 bytes
+compressed using snappy. It's the building block for constructing a large snappy block
+compressed file. Please see test_compressed_formats.py for details.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/compressed_payload.snap
----------------------------------------------------------------------
diff --git a/testdata/compressed_formats/compressed_payload.snap b/testdata/compressed_formats/compressed_payload.snap
new file mode 100644
index 0000000..20ac4ff
Binary files /dev/null and b/testdata/compressed_formats/compressed_payload.snap differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 15f799c..c55c9e7 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -3,6 +3,8 @@
 import os
 import pytest
 import random
+import string
+import struct
 import subprocess
 from os.path import join
 from subprocess import call
@@ -144,19 +146,23 @@ class TestTableWriters(ImpalaTestSuite):
 
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):
-  """ Tests that we gracefully handle when a compressed file in HDFS is larger
-  than 1GB.
-  This test creates a testing data file that is over 1GB and loads it to a table.
-  Then verifies Impala will gracefully fail the query.
-  TODO: Once IMPALA-1619 is fixed, modify the test to test > 2GB file."""
-
+  """
+  Tests that Impala handles compressed files in HDFS larger than 1GB.
+  This test creates a 2GB test data file and loads it into a table.
+  """
   TABLE_NAME = "large_compressed_file"
   TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
-  """ Name the file with ".snappy" extension to let scanner treat it
-  as a snappy compressed file."""
+  """
+  Name the file with ".snappy" extension to let scanner treat it as
+  a snappy block compressed file.
+  """
   FILE_NAME = "largefile.snappy"
-  LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
-  MAX_FILE_SIZE = 1024 * 1024 * 1024
+  # Maximum uncompressed size of an outer block in a snappy block compressed file.
+  CHUNK_SIZE = 1024 * 1024 * 1024
+  # Limit the max file size to 2GB or too much memory may be needed when
+  # uncompressing the buffer. 2GB is sufficient to show that we support
+  # size beyond maximum 32-bit signed value.
+  MAX_FILE_SIZE = 2 * CHUNK_SIZE
 
   @classmethod
   def get_workload(self):
@@ -170,48 +176,65 @@ class TestLargeCompressedFile(ImpalaTestSuite):
       pytest.skip("skipping if it's not exhaustive test.")
     cls.TestMatrix.add_constraint(lambda v:
         (v.get_value('table_format').file_format =='text' and
-        v.get_value('table_format').compression_codec == 'none'))
+        v.get_value('table_format').compression_codec == 'snap'))
 
   def teardown_method(self, method):
     self.__drop_test_table()
 
-  def __gen_char_or_num(self):
-    return random.choice(self.LETTERS)
-
   def __generate_file(self, file_name, file_size):
     """Generate file with random data and a specified size."""
-    s = ''
-    for j in range(1024):
-      s = s + self.__gen_char_or_num()
-    put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
-                           stdin=subprocess.PIPE, bufsize=-1)
-    remain = file_size % 1024
-    for i in range(int(file_size / 1024)):
-      put.stdin.write(s)
-    put.stdin.write(s[0:remain])
-    put.stdin.close()
-    put.wait()
+
+    # Read the payload compressed using snappy. The compressed payload
+    # is generated from a string of 50176 bytes.
+    payload_size = 50176
+    hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat",
+        "/test-warehouse/compressed_payload.snap"], stdout=subprocess.PIPE)
+    compressed_payload = hdfs_cat.stdout.read()
+    compressed_size = len(compressed_payload)
+    hdfs_cat.stdout.close()
+    hdfs_cat.wait()
+
+    # The layout of a snappy-block compressed file is one or more
+    # of the following nested structure which is called "chunk" in
+    # the code below:
+    #
+    # - <big endian 32-bit value encoding the uncompresed size>
+    # - one or more blocks of the following structure:
+    #   - <big endian 32-bit value encoding the compressed size>
+    #   - <raw bits compressed by snappy algorithm>
+
+    # Number of nested structures described above.
+    num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
+    # Number of compressed snappy blocks per chunk.
+    num_blocks_per_chunk = self.CHUNK_SIZE / (compressed_size + 4)
+    # Total uncompressed size of a nested structure.
+    total_chunk_size = num_blocks_per_chunk * payload_size
+
+    hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+        stdin=subprocess.PIPE, bufsize=-1)
+    for i in range(num_chunks):
+      hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
+      for j in range(num_blocks_per_chunk):
+        hdfs_put.stdin.write(struct.pack('>i', compressed_size))
+        hdfs_put.stdin.write(compressed_payload)
+    hdfs_put.stdin.close()
+    hdfs_put.wait()
 
   def test_query_large_file(self, vector):
     self.__create_test_table();
     dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
-    file_size = self.MAX_FILE_SIZE + 1
+    file_size = self.MAX_FILE_SIZE
     self.__generate_file(dst_path, file_size)
     self.client.execute("refresh %s" % self.TABLE_NAME)
 
-    # Query the table and check for expected error.
-    expected_error = 'Requested buffer size %dB > 1GB' % file_size
-    try:
-      result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
-      assert False, "Query was expected to fail"
-    except Exception as e:
-      error_msg = str(e)
-      assert expected_error in error_msg
+    # Query the table
+    result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
 
   def __create_test_table(self):
     self.__drop_test_table()
-    self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'"
-      % (self.TABLE_NAME, self.TABLE_LOCATION))
+    self.client.execute("CREATE TABLE %s (col string) " \
+        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
+        % (self.TABLE_NAME, self.TABLE_LOCATION))
 
   def __drop_test_table(self):
     self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)