You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/05 20:37:39 UTC

[7/8] incubator-impala git commit: Revert "IMPALA-1619: Support 64-bit allocations."

Revert "IMPALA-1619: Support 64-bit allocations."

This reverts commit 1ffb2bd5a2a2faaa759ebdbaf49bf00aa8f86b5e.

Unbreak the packaging builds for now.

Change-Id: Id079acb83d35b51ba4dfe1c8042e1c5ec891d807
Reviewed-on: http://gerrit.cloudera.org:8080/3543
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a07fc367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a07fc367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a07fc367

Branch: refs/heads/master
Commit: a07fc367eee92f6f51365b60883ba3fc2fdabd90
Parents: d197516
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jun 30 12:27:38 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 5 13:37:26 2016 -0700

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser.cc        |  37 ++++----
 be/src/exec/delimited-text-parser.h         |  40 ++++----
 be/src/exec/delimited-text-parser.inline.h  |  55 +++++------
 be/src/exec/hdfs-scanner.cc                 |   2 +-
 be/src/exec/hdfs-scanner.h                  |   3 -
 be/src/exec/hdfs-sequence-scanner.cc        |   8 +-
 be/src/exec/hdfs-text-scanner.cc            |  54 ++++++-----
 be/src/exec/hdfs-text-scanner.h             |   6 +-
 be/src/exec/scanner-context.cc              |  16 +++-
 be/src/runtime/buffered-block-mgr.cc        |  21 ++--
 be/src/runtime/collection-value-builder.h   |   2 +-
 be/src/runtime/free-pool-test.cc            |  27 +-----
 be/src/runtime/free-pool.h                  |  19 ++--
 be/src/runtime/mem-pool-test.cc             |  49 +++-------
 be/src/runtime/mem-pool.h                   |   8 +-
 be/src/runtime/string-buffer-test.cc        |  20 ++--
 be/src/runtime/string-buffer.h              | 116 +++++++++++++----------
 be/src/udf/udf-internal.h                   |   4 +-
 be/src/udf/udf.cc                           |   4 +-
 be/src/udf/udf.h                            |   2 -
 be/src/util/bit-util.h                      |  22 ++---
 be/src/util/codec.cc                        |  12 ++-
 be/src/util/codec.h                         |   5 +
 be/src/util/decompress-test.cc              |   4 +-
 be/src/util/decompress.cc                   |  67 ++++++++++---
 common/thrift/generate_error_codes.py       |   3 -
 infra/python/bootstrap_virtualenv.py        |  10 +-
 infra/python/deps/requirements.txt          |   1 -
 tests/query_test/test_compressed_formats.py |  84 +++++++---------
 29 files changed, 338 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 1a785ac..950eae4 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -116,11 +116,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
 
   if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
     if (process_escapes_) {
-      RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr,
-          row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
+      ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
+          field_locations, num_tuples, num_fields, next_column_start);
     } else {
-      RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr,
-          row_end_locations, field_locations, num_tuples, num_fields, next_column_start));
+      ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
+          field_locations, num_tuples, num_fields, next_column_start);
     }
   }
 
@@ -155,10 +155,9 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
         // If the row ended in \r\n then move to the \n
         ++*next_column_start;
       } else {
-        RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations));
-        Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
-        DCHECK(status.ok());
+        AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations);
+        FillColumns<false>(0, NULL, num_fields, field_locations);
         column_idx_ = num_partition_keys_;
         row_end_locations[*num_tuples] = *byte_buffer_ptr;
         ++(*num_tuples);
@@ -172,8 +171,8 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
         return Status::OK();
       }
     } else if (new_col) {
-      RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-          next_column_start, num_fields, field_locations));
+      AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+          next_column_start, num_fields, field_locations);
     }
 
     --remaining_len;
@@ -184,10 +183,9 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
   // e.g. Sequence files.
   if (tuple_delim_ == '\0') {
     DCHECK_EQ(remaining_len, 0);
-    RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
-        next_column_start, num_fields, field_locations));
-    Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
-    DCHECK(status.ok());
+    AddColumn<true>(*byte_buffer_ptr - *next_column_start,
+        next_column_start, num_fields, field_locations);
+    FillColumns<false>(0, NULL, num_fields, field_locations);
     column_idx_ = num_partition_keys_;
     ++(*num_tuples);
     unfinished_tuple_ = false;
@@ -195,10 +193,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin
   return Status::OK();
 }
 
-// Find the first instance of the tuple delimiter. This will find the start of the first
-// full tuple in buffer by looking for the end of the previous tuple.
-int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
-  int64_t tuple_start = 0;
+// Find the first instance of the tuple delimiter.  This will
+// find the start of the first full tuple in buffer by looking for the end of
+// the previous tuple.
+int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) {
+  int tuple_start = 0;
   const char* buffer_start = buffer;
   bool found = false;
 
@@ -257,7 +256,7 @@ restart:
     // tuple break that are all escape characters, but that is
     // unlikely.
     int num_escape_chars = 0;
-    int64_t before_tuple_end = tuple_start - 2;
+    int before_tuple_end = tuple_start - 2;
     // TODO: If scan range is split between escape character and tuple delimiter,
     // before_tuple_end will be -1. Need to scan previous range for escape characters
     // in this case.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index d754c74..f3e108c 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -32,9 +32,9 @@ class DelimitedTextParser {
   ///   collection_item_delim: delimits collection items
   ///   escape_char: escape delimiters, make them part of the data.
   //
-  /// 'num_cols' is the total number of columns including partition keys.
+  /// num_cols is the total number of columns including partition keys.
   //
-  /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
+  /// is_materialized_col should be initialized to an array of length 'num_cols', with
   /// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
   /// Owned by caller.
   //
@@ -73,8 +73,6 @@ class DelimitedTextParser {
   ///   num_fields: Number of materialized fields parsed
   ///   next_column_start: pointer within file_buffer_ where the next field starts
   ///                      after the return from the call to ParseData
-  /// Returns an error status if any column exceeds the size limit.
-  /// See AddColumn() for details.
   Status ParseFieldLocations(int max_tuples, int64_t remaining_len,
       char** byte_buffer_ptr, char** row_end_locations,
       FieldLocation* field_locations,
@@ -86,10 +84,9 @@ class DelimitedTextParser {
   ///   col.
   /// - *num_fields returns the number of fields processed.
   /// This function is used to parse sequence file records which do not need to
-  /// parse for tuple delimiters. Returns an error status if any column exceeds the
-  /// size limit. See AddColumn() for details.
+  /// parse for tuple delimiters.
   template <bool process_escapes>
-  Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
+  void ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
       int* num_fields);
 
   /// FindFirstInstance returns the position after the first non-escaped tuple
@@ -97,7 +94,7 @@ class DelimitedTextParser {
   /// Used to find the start of a tuple if jumping into the middle of a text file.
   /// Also used to find the sync marker for Sequenced and RC files.
   /// If no tuple delimiter is found within the buffer, return -1;
-  int64_t FindFirstInstance(const char* buffer, int64_t len);
+  int FindFirstInstance(const char* buffer, int len);
 
   /// Will we return the current column to the query?
   /// Hive allows cols at the end of the table that are not in the schema.  We'll
@@ -107,18 +104,17 @@ class DelimitedTextParser {
   }
 
   /// Fill in columns missing at the end of the tuple.
-  /// 'len' and 'last_column' may contain the length and the pointer to the
+  /// len and last_column may contain the length and the pointer to the
   /// last column on which the file ended without a delimiter.
   /// Fills in the offsets and lengths in field_locations.
-  /// If parsing stopped on a delimiter and there is no last column then length will be 0.
+  /// If parsing stopped on a delimiter and there is no last column then len will be  0.
   /// Other columns beyond that are filled with 0 length fields.
-  /// 'num_fields' points to an initialized count of fields and will incremented
+  /// num_fields points to an initialized count of fields and will incremented
   /// by the number fields added.
-  /// 'field_locations' will be updated with the start and length of the fields.
-  /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
+  /// field_locations will be updated with the start and length of the fields.
   template <bool process_escapes>
-  Status FillColumns(int64_t len, char** last_column, int* num_fields,
-      impala::FieldLocation* field_locations);
+  void FillColumns(int len, char** last_column,
+                   int* num_fields, impala::FieldLocation* field_locations);
 
   /// Return true if we have not seen a tuple delimiter for the current tuple being
   /// parsed (i.e., the last byte read was not a tuple delimiter).
@@ -132,28 +128,24 @@ class DelimitedTextParser {
   /// Template parameter:
   ///   process_escapes -- if true the the column may have escape characters
   ///                      and the negative of the len will be stored.
-  ///   len: length of the current column. The length of a column must fit in a 32-bit
-  ///        signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
-  ///        it will be treated as an error.
+  ///   len: lenght of the current column.
   /// Input/Output:
   ///   next_column_start: Start of the current column, moved to the start of the next.
   ///   num_fields: current number of fields processed, updated to next field.
   /// Output:
   ///   field_locations: updated with start and length of current field.
-  /// Return an error status if 'len' exceeds the size limit specified above.
   template <bool process_escapes>
-  Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
-      FieldLocation* field_locations);
+  void AddColumn(int len, char** next_column_start, int* num_fields,
+                 FieldLocation* field_locations);
 
   /// Helper routine to parse delimited text using SSE instructions.
   /// Identical arguments as ParseFieldLocations.
   /// If the template argument, 'process_escapes' is true, this function will handle
   /// escapes, otherwise, it will assume the text is unescaped.  By using templates,
   /// we can special case the un-escaped path for better performance.  The unescaped
-  /// path is optimized away by the compiler. Returns an error status if the length
-  /// of any column exceeds the size limit. See AddColumn() for details.
+  /// path is optimized away by the compiler.
   template <bool process_escapes>
-  Status ParseSse(int max_tuples, int64_t* remaining_len,
+  void ParseSse(int max_tuples, int64_t* remaining_len,
       char** byte_buffer_ptr, char** row_end_locations_,
       FieldLocation* field_locations,
       int* num_tuples, int* num_fields, char** next_column_start);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index 28b5b4b..e96a763 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -26,7 +26,7 @@ namespace impala {
 /// If the character at n is an escape character, then delimiters(tuple/field/escape
 /// characters) at n+1 don't count.
 inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
-    uint16_t* delim_mask) {
+                              uint16_t* delim_mask) {
   // Escape characters can escape escape characters.
   bool first_char_is_escape = *last_char_is_escape;
   bool escape_next = first_char_is_escape;
@@ -39,7 +39,7 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
 
   // Remember last character for the next iteration
   *last_char_is_escape = escape_mask &
-      SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
+    SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
 
   // Shift escape mask up one so they match at the same bit index as the tuple and
   // field mask (instead of being the character before) and set the correct first bit
@@ -50,41 +50,35 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
 }
 
 template <bool process_escapes>
-inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
+inline void DelimitedTextParser::AddColumn(int len, char** next_column_start,
     int* num_fields, FieldLocation* field_locations) {
-  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
-    return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
-  }
   if (ReturnCurrentColumn()) {
     // Found a column that needs to be parsed, write the start/len to 'field_locations'
     field_locations[*num_fields].start = *next_column_start;
-    int64_t field_len = len;
     if (process_escapes && current_column_has_escape_) {
-      field_len = -len;
+      field_locations[*num_fields].len = -len;
+    } else {
+      field_locations[*num_fields].len = len;
     }
-    field_locations[*num_fields].len = static_cast<int32_t>(field_len);
     ++(*num_fields);
   }
   if (process_escapes) current_column_has_escape_ = false;
   *next_column_start += len + 1;
   ++column_idx_;
-  return Status::OK();
 }
 
 template <bool process_escapes>
-inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
+void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
     int* num_fields, FieldLocation* field_locations) {
   // Fill in any columns missing from the end of the tuple.
   char* dummy = NULL;
   if (last_column == NULL) last_column = &dummy;
   while (column_idx_ < num_cols_) {
-    RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
-        num_fields, field_locations));
+    AddColumn<process_escapes>(len, last_column, num_fields, field_locations);
     // The rest of the columns will be null.
     last_column = &dummy;
     len = 0;
   }
-  return Status::OK();
 }
 
 /// SSE optimized raw text file parsing.  SSE4_2 added an instruction (with 3 modes) for
@@ -101,9 +95,10 @@ inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
 ///  Haystack = 'asdfghjklhjbdwwc' (the raw string)
 ///  Result   = '1010000000011001'
 template <bool process_escapes>
-inline Status DelimitedTextParser::ParseSse(int max_tuples,
+inline void DelimitedTextParser::ParseSse(int max_tuples,
     int64_t* remaining_len, char** byte_buffer_ptr,
-    char** row_end_locations, FieldLocation* field_locations,
+    char** row_end_locations,
+    FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
   DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
 
@@ -177,8 +172,8 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
       char* delim_ptr = *byte_buffer_ptr + n;
 
       if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
-        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations));
+        AddColumn<process_escapes>(delim_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations);
         continue;
       }
 
@@ -190,10 +185,9 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
           last_row_delim_offset_ = -1;
           continue;
         }
-        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
-            next_column_start, num_fields, field_locations));
-        Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
-        DCHECK(status.ok());
+        AddColumn<process_escapes>(delim_ptr - *next_column_start,
+            next_column_start, num_fields, field_locations);
+        FillColumns<false>(0, NULL, num_fields, field_locations);
         column_idx_ = num_partition_keys_;
         row_end_locations[*num_tuples] = delim_ptr;
         ++(*num_tuples);
@@ -206,7 +200,7 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
           // If the last character we processed was \r then set the offset to 0
           // so that we will use it at the beginning of the next batch.
           if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
-          return Status::OK();
+          return;
         }
       }
     }
@@ -220,12 +214,11 @@ inline Status DelimitedTextParser::ParseSse(int max_tuples,
     *remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
     *byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
   }
-  return Status::OK();
 }
 
 /// Simplified version of ParseSSE which does not handle tuple delimiters.
 template <bool process_escapes>
-inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
     FieldLocation* field_locations, int* num_fields) {
   char* next_column_start = buffer;
   __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
@@ -270,8 +263,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
         // clear current bit
         delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-        RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
-            &next_column_start, num_fields, field_locations));
+        AddColumn<process_escapes>(buffer + n - next_column_start,
+            &next_column_start, num_fields, field_locations);
       }
 
       if (process_escapes) {
@@ -295,8 +288,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
 
     if (!last_char_is_escape_ &&
           (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
-      RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
-          &next_column_start, num_fields, field_locations));
+      AddColumn<process_escapes>(buffer - next_column_start,
+          &next_column_start, num_fields, field_locations);
     }
 
     --remaining_len;
@@ -305,8 +298,8 @@ inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char*
 
   // Last column does not have a delimiter after it.  Add that column and also
   // pad with empty cols if the input is ragged.
-  return FillColumns<process_escapes>(buffer - next_column_start,
-      &next_column_start, num_fields, field_locations);
+  FillColumns<process_escapes>(buffer - next_column_start,
+        &next_column_start, num_fields, field_locations);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 275956c..78d4994 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -186,7 +186,7 @@ Status HdfsScanner::CommitRows(int num_rows) {
   DCHECK(batch_ != NULL);
   DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
   batch_->CommitRows(num_rows);
-  tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
+  tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows;
 
   // We need to pass the row batch to the scan node if there is too much memory attached,
   // which can happen if the query is very selective. We need to release memory even

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 9069451..7f804f1 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -58,9 +58,6 @@ struct FieldLocation {
   char* start;
   /// Encodes the length and whether or not this fields needs to be unescaped.
   /// If len < 0, then the field needs to be unescaped.
-  ///
-  /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue
-  /// and StringVal. All other types shouldn't be anywhere near this limit.
   int len;
 
   static const char* LLVM_CLASS_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 0cd000f..1460b1a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -229,15 +229,15 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
   for (int i = 0; i < num_to_process; ++i) {
     int num_fields = 0;
     if (delimited_text_parser_->escape_char() == '\0') {
-      RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<false>(
+      delimited_text_parser_->ParseSingleTuple<false>(
           record_locations_[i].len,
           reinterpret_cast<char*>(record_locations_[i].record),
-          &field_locations_[field_location_offset], &num_fields));
+          &field_locations_[field_location_offset], &num_fields);
     } else {
-      RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<true>(
+      delimited_text_parser_->ParseSingleTuple<true>(
           record_locations_[i].len,
           reinterpret_cast<char*>(record_locations_[i].record),
-          &field_locations_[field_location_offset], &num_fields));
+          &field_locations_[field_location_offset], &num_fields);
     }
     DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
     field_location_offset += num_fields;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6cc308d..6e501cc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -260,8 +260,8 @@ Status HdfsTextScanner::FinishScanRange() {
     // tuple.
     DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
     DCHECK(partial_tuple_empty_);
-    DCHECK(boundary_column_.IsEmpty());
-    DCHECK(boundary_row_.IsEmpty());
+    DCHECK(boundary_column_.Empty());
+    DCHECK(boundary_row_.Empty());
     return Status::OK();
   }
 
@@ -286,24 +286,24 @@ Status HdfsTextScanner::FinishScanRange() {
         ss << "Read failed while trying to finish scan range: " << stream_->filename()
            << ":" << stream_->file_offset() << endl << status.GetDetail();
         RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
-      } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
-          !boundary_row_.IsEmpty() ||
+      } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
+          !boundary_row_.Empty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&
               (!scan_node_->materialized_slots().empty() ||
                   scan_node_->num_materialized_partition_keys() > 0))) {
         // Missing columns or row delimiter at end of the file is ok, fill the row in.
-        char* col = boundary_column_.buffer();
+        char* col = boundary_column_.str().ptr;
         int num_fields = 0;
-        RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
-            &col, &num_fields, &field_locations_[0]));
+        delimited_text_parser_->FillColumns<true>(boundary_column_.Size(),
+            &col, &num_fields, &field_locations_[0]);
 
         MemPool* pool;
         TupleRow* tuple_row_mem;
         int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
         DCHECK_GE(max_tuples, 1);
         // Set variables for proper error outputting on boundary tuple
-        batch_start_ptr_ = boundary_row_.buffer();
-        row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len();
+        batch_start_ptr_ = boundary_row_.str().ptr;
+        row_end_locations_[0] = batch_start_ptr_ + boundary_row_.str().len;
         int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
         DCHECK_LE(num_tuples, 1);
         DCHECK_GE(num_tuples, 0);
@@ -371,7 +371,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
         (num_fields > 0 || *num_tuples > 0)) {
       // There can be one partial tuple which returned no more fields from this buffer.
       DCHECK_LE(*num_tuples, num_fields + 1);
-      if (!boundary_column_.IsEmpty()) {
+      if (!boundary_column_.Empty()) {
         RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
         boundary_column_.Clear();
       }
@@ -423,11 +423,11 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
     Status status;
     if (num_bytes > 0) {
       stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-          &byte_buffer_read_size_, &status);
+                        &byte_buffer_read_size_, &status);
     } else {
       DCHECK_EQ(num_bytes, 0);
       status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-          &byte_buffer_read_size_);
+                                  &byte_buffer_read_size_);
     }
     RETURN_IF_ERROR(status);
     *eosr = stream_->eosr();
@@ -555,7 +555,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   }
 
   // Need to read the entire file.
-  if (file_size > byte_buffer_read_size_) {
+  if (file_size < byte_buffer_read_size_) {
     stringstream ss;
     ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
        << "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
@@ -600,8 +600,8 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
-      int64_t next_tuple_offset = 0;
-      int64_t bytes_left = byte_buffer_read_size_;
+      int next_tuple_offset = 0;
+      int bytes_left = byte_buffer_read_size_;
       while (num_skipped_rows < num_rows_to_skip) {
         next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
           bytes_left);
@@ -610,6 +610,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
         bytes_left -= next_tuple_offset;
         ++num_skipped_rows;
       }
+
       if (next_tuple_offset != -1) *tuple_found = true;
     } while (!*tuple_found && !eosr);
 
@@ -680,7 +681,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 // codegen'd using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
 Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
-    const vector<ExprContext*>& conjunct_ctxs) {
+                                   const vector<ExprContext*>& conjunct_ctxs) {
   if (!node->runtime_state()->codegen_enabled()) return NULL;
   LlvmCodeGen* codegen;
   if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
@@ -716,9 +717,9 @@ void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) {
     row_start = row_end_locations_[row_idx - 1] + 1;
   }
 
-  if (!boundary_row_.IsEmpty()) {
-    // Log the beginning of the line from the previous file buffer(s).
-    *ss << string(boundary_row_.buffer(), boundary_row_.len());
+  if (!boundary_row_.Empty()) {
+    // Log the beginning of the line from the previous file buffer(s)
+    *ss << boundary_row_.str();
   }
   // Log the erroneous line (or the suffix of a line if !boundary_line.empty()).
   *ss << string(row_start, row_end - row_start);
@@ -791,7 +792,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
   // Write complete tuples.  The current field, if any, is at the start of a tuple.
   if (num_tuples > 0) {
     int max_added_tuples = (scan_node_->limit() == -1) ?
-        num_tuples : scan_node_->limit() - scan_node_->rows_returned();
+          num_tuples : scan_node_->limit() - scan_node_->rows_returned();
     int tuples_returned = 0;
     // Call jitted function if possible
     if (write_tuples_fn_ != NULL) {
@@ -835,29 +836,31 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
 Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
   bool needs_escape = data->len < 0;
   int copy_len = needs_escape ? -data->len : data->len;
-  int64_t total_len = copy_len + boundary_column_.len();
+  int total_len = copy_len + boundary_column_.Size();
   char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
   if (UNLIKELY(str_data == NULL)) {
     string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
         "$0 bytes.", total_len);
     return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
   }
-  memcpy(str_data, boundary_column_.buffer(), boundary_column_.len());
-  memcpy(str_data + boundary_column_.len(), data->start, copy_len);
+  memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
+  memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
   data->start = str_data;
   data->len = needs_escape ? -total_len : total_len;
   return Status::OK();
 }
 
-void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
+int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
     int num_fields, bool copy_strings) {
+  int next_line_offset = 0;
   for (int i = 0; i < num_fields; ++i) {
-    bool need_escape = false;
+    int need_escape = false;
     int len = fields[i].len;
     if (len < 0) {
       len = -len;
       need_escape = true;
     }
+    next_line_offset += (len + 1);
 
     const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
     if (!text_converter_->WriteSlot(desc, partial_tuple_,
@@ -867,4 +870,5 @@ void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
     }
     ++slot_idx_;
   }
+  return next_line_offset;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index dae104d..997637d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -156,9 +156,9 @@ class HdfsTextScanner : public HdfsScanner {
   int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples);
 
   /// Utility function to write out 'num_fields' to 'tuple_'.  This is used to parse
-  /// partial tuples. If copy_strings is true, strings from fields will be copied into
-  /// the boundary pool.
-  void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
+  /// partial tuples.  Returns bytes processed.  If copy_strings is true, strings
+  /// from fields will be copied into the boundary pool.
+  int WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
 
   /// Appends the current file and line to the RuntimeState's error log.
   /// row_idx is 0-based (in current batch) where the parse error occured.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 6a5081d..3c78d88 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -234,9 +234,15 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
       boundary_buffer_->Clear();
     }
   }
-
-  // Resize the buffer to the right size.
-  RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
+  // Workaround IMPALA-1619. Fail the request if requested_len is more than 1GB.
+  // StringBuffer can only handle 32-bit allocations and StringBuffer::Append()
+  // will allocate twice the current buffer size, cause int overflow.
+  // TODO: Revert once IMPALA-1619 is fixed.
+  if (UNLIKELY(requested_len > StringValue::MAX_LENGTH)) {
+    LOG(WARNING) << "Requested buffer size " << requested_len << "B > 1GB."
+        << GetStackTrace();
+    return Status(Substitute("Requested buffer size $0B > 1GB", requested_len));
+  }
 
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
     // We need to fetch more bytes. Copy the end of the current buffer and fetch the next
@@ -267,8 +273,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
   } else {
     RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
     boundary_buffer_bytes_left_ += num_bytes;
-    boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
-        boundary_buffer_->len() - boundary_buffer_bytes_left_;
+    boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
+                           boundary_buffer_->Size() - boundary_buffer_bytes_left_;
     io_buffer_bytes_left_ -= num_bytes;
     io_buffer_pos_ += num_bytes;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index a9e95cd..8265ea2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -303,14 +303,22 @@ bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers)
 }
 
 bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
-  int64_t buffers_needed = BitUtil::Ceil(size, max_block_size());
-  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) {
-    VLOG_QUERY << "Trying to consume " << size << " which is out of range.";
+  // Workaround IMPALA-1619. Return immediately if the allocation size will cause
+  // an arithmetic overflow.
+  if (UNLIKELY(size >= (1LL << 31))) {
+    // IMPALA-3238: don't repeatedly log warning when bumping up against this limit for
+    // large hash tables.
+    if (!client->logged_large_allocation_warning_) {
+      LOG(WARNING) << "Trying to allocate memory >=2GB (" << size << ")B."
+                   << GetStackTrace();
+      client->logged_large_allocation_warning_ = true;
+    }
     return false;
   }
+  int buffers_needed = BitUtil::Ceil(size, max_block_size());
   DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
-
   unique_lock<mutex> lock(lock_);
+
   if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
     // For small allocations (less than a block size), just let the allocation through.
     client->tracker_->ConsumeLocal(size, client->query_tracker_);
@@ -585,7 +593,7 @@ int BufferedBlockMgr::num_pinned_buffers(Client* client) const {
 }
 
 int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const {
-  return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
+  return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
 }
 
 MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
@@ -1009,8 +1017,7 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
     //  1. In the unpinned list. The buffer will not be in the free list.
     //  2. in_write_ == true. The buffer will not be in the free list.
     //  3. The buffer is free, but hasn't yet been reassigned to a different block.
-    DCHECK_EQ(block->buffer_desc_->len, max_block_size())
-        << "Non-I/O blocks are always pinned";
+    DCHECK_EQ(block->buffer_desc_->len, max_block_size()) << "Non-I/O blocks are always pinned";
     DCHECK(unpinned_blocks_.Contains(block) ||
            block->in_write_ ||
            free_io_buffers_.Contains(block->buffer_desc_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index 4065b80..c57b546 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -32,7 +32,7 @@ class CollectionValueBuilder {
 
   CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc,
       MemPool* pool, RuntimeState* state,
-      int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
+      int initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
     : coll_value_(coll_value),
       tuple_desc_(tuple_desc),
       pool_(pool),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/free-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool-test.cc b/be/src/runtime/free-pool-test.cc
index c8ff93d..02d245a 100644
--- a/be/src/runtime/free-pool-test.cc
+++ b/be/src/runtime/free-pool-test.cc
@@ -85,22 +85,6 @@ TEST(FreePoolTest, Basic) {
   EXPECT_EQ(mem_pool.total_allocated_bytes(), 64);
 
   mem_pool.FreeAll();
-
-  // Try making allocations larger than 1GB.
-  uint8_t* p5 = pool.Allocate(1LL << 32);
-  EXPECT_TRUE(p5 != NULL);
-  for (int64_t i = 0; i < (1LL << 32); i += (1 << 29)) {
-    *(p5 + i) = i;
-  }
-  EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
-
-  // Test zero-byte allocation.
-  p5 = pool.Allocate(0);
-  EXPECT_TRUE(p5 != NULL);
-  EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8);
-  pool.Free(p5);
-
-  mem_pool.FreeAll();
 }
 
 // In this test we make two allocations at increasing sizes and then we
@@ -112,13 +96,13 @@ TEST(FreePoolTest, Loop) {
   MemPool mem_pool(&tracker);
   FreePool pool(&mem_pool);
 
-  map<int64_t, pair<uint8_t*, uint8_t*> > primed_allocations;
-  vector<int64_t> allocation_sizes;
+  map<int, pair<uint8_t*, uint8_t*> > primed_allocations;
+  vector<int> allocation_sizes;
 
   int64_t expected_pool_size = 0;
 
   // Pick a non-power of 2 to exercise more code.
-  for (int64_t size = 5; size < 6LL * 1024 * 1024 * 1024; size *= 5) {
+  for (int size = 3; size < 1024 * 1024 * 1024; size *= 3) {
     uint8_t* p1 = pool.Allocate(size);
     uint8_t* p2 = pool.Allocate(size);
     EXPECT_TRUE(p1 != NULL);
@@ -179,11 +163,6 @@ TEST(FreePoolTest, ReAlloc) {
   ptr = pool.Allocate(600);
   EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8);
 
-  // Try allocation larger than 1GB.
-  uint8_t* ptr4 = pool.Reallocate(ptr3, 1LL << 32);
-  EXPECT_TRUE(ptr3 != ptr4);
-  EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8 + (1LL << 32) + 8);
-
   mem_pool.FreeAll();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/free-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h
index 90df749..dfabaf0 100644
--- a/be/src/runtime/free-pool.h
+++ b/be/src/runtime/free-pool.h
@@ -52,9 +52,8 @@ class FreePool {
     memset(&lists_, 0, sizeof(lists_));
   }
 
-  /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes.
-  uint8_t* Allocate(int64_t size) {
-    DCHECK_GE(size, 0);
+  /// Allocates a buffer of size.
+  uint8_t* Allocate(int size) {
 #ifndef NDEBUG
     static int32_t alloc_counts = 0;
     if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -65,15 +64,16 @@ class FreePool {
     ++net_allocations_;
     if (FLAGS_disable_mem_pools) return reinterpret_cast<uint8_t*>(malloc(size));
 
-    /// Return a non-NULL dummy pointer. NULL is reserved for failures.
-    if (UNLIKELY(size == 0)) return mem_pool_->EmptyAllocPtr();
+    /// This is the typical malloc behavior. NULL is reserved for failures.
+    if (size == 0) return reinterpret_cast<uint8_t*>(0x1);
 
     int free_list_idx = Bits::Log2Ceiling64(size);
     DCHECK_LT(free_list_idx, NUM_LISTS);
+
     FreeListNode* allocation = lists_[free_list_idx].next;
     if (allocation == NULL) {
       // There wasn't an existing allocation of the right size, allocate a new one.
-      size = 1LL << free_list_idx;
+      size = 1 << free_list_idx;
       allocation = reinterpret_cast<FreeListNode*>(
           mem_pool_->Allocate(size + sizeof(FreeListNode)));
       if (UNLIKELY(allocation == NULL)) {
@@ -97,7 +97,7 @@ class FreePool {
       free(ptr);
       return;
     }
-    if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return;
+    if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return;
     FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
     FreeListNode* list = node->list;
 #ifndef NDEBUG
@@ -114,7 +114,7 @@ class FreePool {
   ///
   /// NULL will be returned on allocation failure. It's the caller's responsibility to
   /// free the memory buffer pointed to by "ptr" in this case.
-  uint8_t* Reallocate(uint8_t* ptr, int64_t size) {
+  uint8_t* Reallocate(uint8_t* ptr, int size) {
 #ifndef NDEBUG
     static int32_t alloc_counts = 0;
     if (FLAGS_stress_free_pool_alloc > 0 &&
@@ -125,14 +125,13 @@ class FreePool {
     if (FLAGS_disable_mem_pools) {
       return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size));
     }
-    if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size);
+    if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return Allocate(size);
     FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode));
     FreeListNode* list = node->list;
 #ifndef NDEBUG
     CheckValidAllocation(list, ptr);
 #endif
     int bucket_idx = (list - &lists_[0]);
-    DCHECK_LT(bucket_idx, NUM_LISTS);
     // This is the actual size of ptr.
     int allocation_size = 1 << bucket_idx;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 121098c..ab6bd5a 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -21,9 +21,6 @@
 
 #include "common/names.h"
 
-// Maximum allocation size which exceeds 32-bit.
-#define LARGE_ALLOC_SIZE (1LL << 32)
-
 namespace impala {
 
 // Utility class to call private functions on MemPool.
@@ -128,11 +125,6 @@ TEST(MemPoolTest, Basic) {
     p2.FreeAll();
     p3.FreeAll();
   }
-
-  // Test zero byte allocation.
-  uint8_t* ptr = p.Allocate(0);
-  EXPECT_TRUE(ptr != NULL);
-  EXPECT_EQ(0, p.GetTotalChunkSizes());
 }
 
 // Test that we can keep an allocated chunk and a free chunk.
@@ -200,22 +192,6 @@ TEST(MemPoolTest, ReturnPartial) {
     EXPECT_EQ(2, ptr[i]);
   }
 
-  // Try ReturnPartialAllocations() with 64-bit values.
-  uint8_t* ptr4 = p.Allocate(LARGE_ALLOC_SIZE + 512);
-  EXPECT_EQ(1024 + LARGE_ALLOC_SIZE + 512, p.total_allocated_bytes());
-  memset(ptr4, 3, 512 * 2);
-  p.ReturnPartialAllocation(LARGE_ALLOC_SIZE);
-  uint8_t* ptr5 = p.Allocate(512);
-  EXPECT_TRUE(ptr5 == ptr4 + 512);
-  memset(ptr5, 4, 512);
-
-  for (int i = 0; i < 512; ++i) {
-    EXPECT_EQ(3, ptr4[i]);
-  }
-  for (int i = 512; i < 512 * 2; ++i) {
-    EXPECT_EQ(4, ptr4[i]);
-  }
-
   p.FreeAll();
 }
 
@@ -276,50 +252,51 @@ TEST(MemPoolTest, Limits) {
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
 
   // Try To allocate 20 bytes, this should succeed. TryAllocate() should leave the
-  // pool in a functional state.
+  // pool in a functional state..
   result = p2->TryAllocate(20);
   ASSERT_TRUE(result != NULL);
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
 
+
   p2->FreeAll();
   delete p2;
 }
 
 TEST(MemPoolTest, MaxAllocation) {
-  int64_t int_max_rounded = BitUtil::RoundUp(LARGE_ALLOC_SIZE, 8);
+  int64_t int_max_rounded = BitUtil::RoundUp(INT_MAX, 8);
 
-  // Allocate a single LARGE_ALLOC_SIZE chunk
+  // Allocate a single INT_MAX chunk
   MemTracker tracker;
   MemPool p1(&tracker);
-  uint8_t* ptr = p1.Allocate(LARGE_ALLOC_SIZE);
+  uint8_t* ptr = p1.Allocate(INT_MAX);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded, p1.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded, p1.total_allocated_bytes());
   p1.FreeAll();
 
-  // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an LARGE_ALLOC_SIZE chunk
+  // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an INT_MAX chunk
   MemPool p2(&tracker);
   p2.Allocate(8);
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, p2.GetTotalChunkSizes());
   EXPECT_EQ(8, p2.total_allocated_bytes());
-  ptr = p2.Allocate(LARGE_ALLOC_SIZE);
+  ptr = p2.Allocate(INT_MAX);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE + int_max_rounded,
       p2.GetTotalChunkSizes());
   EXPECT_EQ(8LL + int_max_rounded, p2.total_allocated_bytes());
   p2.FreeAll();
 
-  // Allocate three LARGE_ALLOC_SIZE chunks followed by a small chunk
-  // followed by another LARGE_ALLOC_SIZE chunk.
+  // Allocate three INT_MAX chunks followed by a small chunk followed by another INT_MAX
+  // chunk
   MemPool p3(&tracker);
-  p3.Allocate(LARGE_ALLOC_SIZE);
+  p3.Allocate(INT_MAX);
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+  ptr = p3.Allocate(INT_MAX);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 2, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 2, p3.total_allocated_bytes());
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+  ptr = p3.Allocate(INT_MAX);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 3, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 3, p3.total_allocated_bytes());
@@ -331,7 +308,7 @@ TEST(MemPoolTest, MaxAllocation) {
   EXPECT_EQ(int_max_rounded * 3 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 3 + 8, p3.total_allocated_bytes());
   // Allocates new int_max_rounded chunk
-  ptr = p3.Allocate(LARGE_ALLOC_SIZE);
+  ptr = p3.Allocate(INT_MAX);
   EXPECT_TRUE(ptr != NULL);
   EXPECT_EQ(int_max_rounded * 4 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes());
   EXPECT_EQ(int_max_rounded * 4 + 8, p3.total_allocated_bytes());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index e1c4d2b..15325c0 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -101,7 +101,7 @@ class MemPool {
   /// Returns 'byte_size' to the current chunk back to the mem pool. This can
   /// only be used to return either all or part of the previous allocation returned
   /// by Allocate().
-  void ReturnPartialAllocation(int64_t byte_size) {
+  void ReturnPartialAllocation(int byte_size) {
     DCHECK_GE(byte_size, 0);
     DCHECK(current_chunk_idx_ != -1);
     ChunkInfo& info = chunks_[current_chunk_idx_];
@@ -110,11 +110,6 @@ class MemPool {
     total_allocated_bytes_ -= byte_size;
   }
 
-  /// Return a dummy pointer for zero-length allocations.
-  static uint8_t* EmptyAllocPtr() {
-    return reinterpret_cast<uint8_t*>(&zero_length_region_);
-  }
-
   /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
   void Clear();
 
@@ -213,7 +208,6 @@ class MemPool {
 
   template <bool CHECK_LIMIT_FIRST>
   uint8_t* Allocate(int64_t size) noexcept {
-    DCHECK_GE(size, 0);
     if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t *>(&zero_length_region_);
 
     int64_t num_bytes = BitUtil::RoundUp(size, 8);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index 2387d60..95d2e41 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -24,10 +24,10 @@
 namespace impala {
 
 void ValidateString(const string& std_str, const StringBuffer& str) {
-  EXPECT_EQ(std_str.empty(), str.IsEmpty());
-  EXPECT_EQ(static_cast<int64_t>(std_str.size()), str.len());
+  EXPECT_EQ(std_str.empty(), str.Empty());
+  EXPECT_EQ((int)std_str.size(), str.Size());
   if (std_str.size() > 0) {
-    EXPECT_EQ(strncmp(std_str.c_str(), str.buffer(), std_str.size()), 0);
+    EXPECT_EQ(strncmp(std_str.c_str(), str.str().ptr, std_str.size()), 0);
   }
 }
 
@@ -55,6 +55,11 @@ TEST(StringBufferTest, Basic) {
   str.Append("World", strlen("World"));
   ValidateString(std_str, str);
 
+  // Assign
+  std_str.assign("foo");
+  str.Assign("foo", strlen("foo"));
+  ValidateString(std_str, str);
+
   // Clear
   std_str.clear();
   str.Clear();
@@ -67,22 +72,23 @@ TEST(StringBufferTest, Basic) {
 }
 
 TEST(StringBufferTest, AppendBoundary) {
-  // Test StringBuffer::Append() works beyond 1GB.
+  // Test StringBuffer::Append() up to 1GB is ok
+  // TODO: Once IMPALA-1619 is fixed, we should change the test to verify
+  // append over 2GB string is supported.
   MemTracker tracker;
   MemPool pool(&tracker);
   StringBuffer str(&pool);
   string std_str;
 
   const int64_t chunk_size = 8 * 1024 * 1024;
-  const int64_t max_data_size = 1LL << 32;
   std_str.resize(chunk_size, 'a');
   int64_t data_size = 0;
-  while (data_size + chunk_size <= max_data_size) {
+  while (data_size + chunk_size <= StringValue::MAX_LENGTH) {
     str.Append(std_str.c_str(), chunk_size);
     data_size += chunk_size;
   }
   EXPECT_EQ(str.buffer_size(), data_size);
-  std_str.resize(max_data_size, 'a');
+  std_str.resize(StringValue::MAX_LENGTH, 'a');
   ValidateString(std_str, str);
 
   pool.FreeAll();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 3725181..ec59806 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -27,87 +27,99 @@ namespace impala {
 
 /// Dynamic-sizable string (similar to std::string) but without as many
 /// copies and allocations.
-/// StringBuffer is a buffer of char allocated from 'pool'. Current usage and size of the
-/// buffer are tracked in 'len_' and 'buffer_size_' respectively. It supports a subset of
-/// the std::string functionality but will only allocate bigger string buffers as
-/// necessary. std::string tries to be immutable and will reallocate very often.
-/// std::string should be avoided in all hot paths.
+/// StringBuffer wraps a StringValue object with a pool and memory buffer length.
+/// It supports a subset of the std::string functionality but will only allocate
+/// bigger string buffers as necessary.  std::string tries to be immutable and will
+/// reallocate very often.  std::string should be avoided in all hot paths.
 class StringBuffer {
  public:
   /// C'tor for StringBuffer.  Memory backing the string will be allocated from
   /// the pool as necessary. Can optionally be initialized from a StringValue.
   StringBuffer(MemPool* pool, StringValue* str = NULL)
-      : pool_(pool), buffer_(NULL), len_(0), buffer_size_(0) {
+      : pool_(pool), buffer_size_(0) {
     DCHECK(pool_ != NULL);
     if (str != NULL) {
-      buffer_ = str->ptr;
-      len_ = buffer_size_ = str->len;
+      string_value_ = *str;
+      buffer_size_ = str->len;
     }
   }
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
-  Status Append(const char* str, int64_t str_len) {
-    int64_t new_len = len_ + str_len;
+  /// Return error status if memory limit is exceeded.
+  Status Append(const char* str, int len) {
+    int new_len = len + string_value_.len;
     if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
-    memcpy(buffer_ + len_, str, str_len);
-    len_ += str_len;
+    memcpy(string_value_.ptr + string_value_.len, str, len);
+    string_value_.len = new_len;
     return Status::OK();
   }
 
-  /// Wrapper around append() for input type 'uint8_t'.
-  Status Append(const uint8_t* str, int64_t str_len) {
-    return Append(reinterpret_cast<const char*>(str), str_len);
+  /// TODO: switch everything to uint8_t?
+  Status Append(const uint8_t* str, int len) {
+    return Append(reinterpret_cast<const char*>(str), len);
   }
 
-  /// Clear the underlying StringValue. The allocated buffer can be reused.
-  void Clear() { len_ = 0; }
+  /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
+  Status Assign(const char* str, int len) {
+    Clear();
+    return Append(str, len);
+  }
+
+  /// Clear the underlying StringValue.  The allocated buffer can be reused.
+  void Clear() {
+    string_value_.len = 0;
+  }
 
-  /// Reset the usage and size of the buffer. Note that the allocated buffer is
-  /// retained but cannot be reused.
+  /// Clears the underlying buffer and StringValue
   void Reset() {
-    len_ = 0;
+    string_value_.len = 0;
     buffer_size_ = 0;
-    buffer_ = NULL;
   }
 
-  /// Returns true if no byte is consumed in the buffer.
-  bool IsEmpty() const { return len_ == 0; }
-
-  /// Grows the buffer to be at least 'new_size', copying over the previous data
-  /// into the new buffer. The old buffer is not freed. Return an error status if
-  /// growing the buffer will exceed memory limit.
-  Status GrowBuffer(int64_t new_size) {
-    if (LIKELY(new_size > buffer_size_)) {
-      int64_t old_size = buffer_size_;
-      buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);
-      char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
-      if (UNLIKELY(new_buffer == NULL)) {
-        string details = Substitute("StringBuffer failed to grow buffer from $0 "
-            "to $1 bytes.", old_size, buffer_size_);
-        return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
-      }
-      if (LIKELY(len_ > 0)) memcpy(new_buffer, buffer_, len_);
-      buffer_ = new_buffer;
-    }
-    return Status::OK();
+  /// Returns whether the current string is empty
+  bool Empty() const {
+    return string_value_.len == 0;
   }
 
-  /// Returns the number of bytes consumed in the buffer.
-  int64_t len() const { return len_; }
+  /// Returns the length of the current string
+  int Size() const {
+    return string_value_.len;
+  }
 
-  /// Returns the pointer to the buffer. Note that it's the caller's responsibility
-  /// to not retain the pointer to 'buffer_' across call to Append() as the buffer_
-  /// may be relocated in Append().
-  char* buffer() const { return buffer_; }
+  /// Returns the underlying StringValue
+  const StringValue& str() const {
+    return string_value_;
+  }
 
-  /// Returns the size of the buffer.
-  int64_t buffer_size() const { return buffer_size_; }
+  /// Returns the buffer size
+  int buffer_size() const {
+    return buffer_size_;
+  }
 
  private:
+  /// Grows the buffer backing the string to be at least new_size, copying over the
+  /// previous string data into the new buffer. Return error status if memory limit
+  /// is exceeded.
+  Status GrowBuffer(int new_len) {
+    // TODO: Release/reuse old buffers somehow
+    buffer_size_ = std::max(buffer_size_ * 2, new_len);
+    DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
+    char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+    if (UNLIKELY(new_buffer == NULL)) {
+      string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
+          buffer_size_);
+      return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+    }
+    if (LIKELY(string_value_.len > 0)) {
+      memcpy(new_buffer, string_value_.ptr, string_value_.len);
+    }
+    string_value_.ptr = new_buffer;
+    return Status::OK();
+  }
+
   MemPool* pool_;
-  char* buffer_;
-  int64_t len_;         // number of bytes consumed in the buffer.
-  int64_t buffer_size_; // size of the buffer.
+  StringValue string_value_;
+  int buffer_size_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 1996838..599e42f 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -79,7 +79,7 @@ class FunctionContextImpl {
   /// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear.
   /// For UDFs, the allocations can be freed at the row level.
   /// TODO: free them at the batch level and save some copies?
-  uint8_t* AllocateLocal(int64_t byte_size) noexcept;
+  uint8_t* AllocateLocal(int byte_size) noexcept;
 
   /// Frees all allocations returned by AllocateLocal().
   void FreeLocalAllocations() noexcept;
@@ -121,7 +121,7 @@ class FunctionContextImpl {
   /// if necessary.
   ///
   /// Return false if 'buf' is null; returns true otherwise.
-  bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size);
+  bool CheckAllocResult(const char* fn_name, uint8_t* buf, int byte_size);
 
   /// Preallocated buffer for storing varargs (if the function has any). Allocated and
   /// owned by this object, but populated by an Expr function.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index ff81c9c..4cadb63 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -267,7 +267,7 @@ const char* FunctionContext::error_msg() const {
 }
 
 inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
-    uint8_t* buf, int64_t byte_size) {
+    uint8_t* buf, int byte_size) {
   if (UNLIKELY(buf == NULL)) {
     stringstream ss;
     ss << string(fn_name) << "() failed to allocate " << byte_size << " bytes.";
@@ -416,7 +416,7 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) {
   }
 }
 
-uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept {
+uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) noexcept {
   assert(!closed_);
   if (byte_size == 0) return NULL;
   uint8_t* buffer = pool_->Allocate(byte_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 210c855..1b1cdab 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -140,7 +140,6 @@ class FunctionContext {
   /// The UDF/UDA is responsible for calling Free() on all buffers returned by Allocate().
   /// If Allocate() fails or causes the memory limit to be exceeded, the error will be
   /// set in this object causing the query to fail.
-  /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
   uint8_t* Allocate(int byte_size) noexcept;
 
   /// Wrapper around Allocate() to allocate a buffer of the given type "T".
@@ -156,7 +155,6 @@ class FunctionContext {
   /// memory limit to be exceeded, the error will be set in this object.
   ///
   /// This should be used for buffers that constantly get appended to.
-  /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756.
   uint8_t* Reallocate(uint8_t* ptr, int byte_size) noexcept;
 
   /// Frees a buffer returned from Allocate() or Reallocate()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 47592b8..cae8212 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -23,7 +23,6 @@
 #endif
 
 #include <boost/type_traits/make_unsigned.hpp>
-#include <limits>
 
 #include "common/compiler-util.h"
 #include "util/cpu-info.h"
@@ -204,36 +203,31 @@ class BitUtil {
   static inline uint16_t FromBigEndian(uint16_t val) { return val; }
 #endif
 
-  /// Returns true if 'value' is a non-negative 32-bit integer.
-  static inline bool IsNonNegative32Bit(int64_t value) {
-    return (uint64_t)value <= std::numeric_limits<int32_t>::max();
-  }
-
-  /// Logical right shift for signed integer types
-  /// This is needed because the C >> operator does arithmetic right shift
-  /// Negative shift amounts lead to undefined behavior
+  // Logical right shift for signed integer types
+  // This is needed because the C >> operator does arithmetic right shift
+  // Negative shift amounts lead to undefined behavior
   template<typename T>
   static T ShiftRightLogical(T v, int shift) {
     // Conversion to unsigned ensures most significant bits always filled with 0's
     return static_cast<typename make_unsigned<T>::type>(v) >> shift;
   }
 
-  /// Get an specific bit of a numeric type
+  // Get an specific bit of a numeric type
   template<typename T>
   static inline int8_t GetBit(T v, int bitpos) {
     T masked = v & (static_cast<T>(0x1) << bitpos);
     return static_cast<int8_t>(ShiftRightLogical(masked, bitpos));
   }
 
-  /// Set a specific bit to 1
-  /// Behavior when bitpos is negative is undefined
+  // Set a specific bit to 1
+  // Behavior when bitpos is negative is undefined
   template<typename T>
   static T SetBit(T v, int bitpos) {
     return v | (static_cast<T>(0x1) << bitpos);
   }
 
-  /// Set a specific bit to 0
-  /// Behavior when bitpos is negative is undefined
+  // Set a specific bit to 0
+  // Behavior when bitpos is negative is undefined
   template<typename T>
   static T UnsetBit(T v, int bitpos) {
     return v & ~(static_cast<T>(0x1) << bitpos);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 2ee7415..6863d1f 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -182,13 +182,15 @@ Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
     const uint8_t* input, int* output_length, uint8_t** output) {
   int64_t input_len64 = input_length;
   int64_t output_len64 = *output_length;
-  RETURN_IF_ERROR(
-      ProcessBlock(output_preallocated, input_len64, input, &output_len64, output));
-  // Buffer size should be between [0, (2^31 - 1)] bytes.
-  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) {
+  RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64,
+                               output));
+  // Check whether we are going to have an overflow if we are going to cast from int64_t
+  // to int.
+  // TODO: Is there a faster way to do this check?
+  if (UNLIKELY(output_len64 > numeric_limits<int>::max())) {
     return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
         output_len64));;
   }
-  *output_length = static_cast<int>(output_len64);
+  *output_length = static_cast<int32_t>(output_len64);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 563a3b9..a337983 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -150,6 +150,11 @@ class Codec {
 
   bool supports_streaming() const { return supports_streaming_; }
 
+  /// Largest block we will compress/decompress: 2GB.
+  /// We are dealing with compressed blocks that are never this big but we want to guard
+  /// against a corrupt file that has the block length as some large number.
+  static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1;
+
  protected:
   /// Create a compression operator
   /// Inputs:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 7a94db0..3a32756 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -121,8 +121,10 @@ class DecompressorTest : public ::testing::Test {
       EXPECT_GE(max_compressed_length, 0);
       uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
       compressed_length = max_compressed_length;
+
+
       EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
-          &compressed));
+                                           &compressed));
     }
 
     output_len = decompressor->MaxOutputLen(compressed_length, compressed);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 900f891..8a2ea74 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -148,6 +148,9 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     if (!reuse_buffer_ || out_buffer_ == NULL) {
       // guess that we will need 2x the input length.
       buffer_length_ = input_length * 2;
+      if (buffer_length_ > MAX_BLOCK_SIZE) {
+        return Status("Decompressor: block size is too big");
+      }
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -201,6 +204,11 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     // User didn't supply the buffer, double the buffer and try again.
     temp_memory_pool_->Clear();
     buffer_length_ *= 2;
+    if (buffer_length_ > MAX_BLOCK_SIZE) {
+      stringstream ss;
+      ss << "GzipDecompressor: block size is too big: " << buffer_length_;
+      return Status(ss.str());
+    }
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
     if (UNLIKELY(out_buffer_ == NULL)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
@@ -265,6 +273,9 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
   } else if (!reuse_buffer_ || out_buffer_ == NULL) {
     // guess that we will need 2x the input length.
     buffer_length_ = input_length * 2;
+    if (buffer_length_ > MAX_BLOCK_SIZE) {
+      return Status("Decompressor: block size is too big");
+    }
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
     if (UNLIKELY(out_buffer_ == NULL)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -284,6 +295,9 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
       DCHECK(!output_preallocated);
       temp_memory_pool_->Clear();
       buffer_length_ = buffer_length_ * 2;
+      if (buffer_length_ > MAX_BLOCK_SIZE) {
+        return Status("Decompressor: block size is too big");
+      }
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
@@ -401,18 +415,17 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
 }
 
 // Hadoop uses a block compression scheme on top of snappy.  As per the hadoop docs
-// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split
-// into blocks.  Each block "contains the uncompressed length for the block followed
-// by one of more length-prefixed blocks of compressed data."
+// the input is split into blocks.  Each block "contains the uncompressed length for
+// the block followed by one of more length-prefixed blocks of compressed data."
 // This is essentially blocks of blocks.
 // The outer block consists of:
-//   - 4 byte big endian uncompressed_size
+//   - 4 byte little endian uncompressed_size
 //   < inner blocks >
 //   ... repeated until input_len is consumed ..
 // The inner blocks have:
-//   - 4-byte big endian compressed_size
+//   - 4-byte little endian compressed_size
 //   < snappy compressed block >
-//   - 4-byte big endian compressed_size
+//   - 4-byte little endian compressed_size
 //   < snappy compressed block >
 //   ... repeated until uncompressed_size from outer block is consumed ...
 
@@ -430,6 +443,15 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     input += sizeof(uint32_t);
     input_len -= sizeof(uint32_t);
 
+    if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) {
+      if (uncompressed_total_len == 0) {
+        // TODO: is this check really robust?
+        return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
+            uncompressed_block_len);
+      }
+      break;
+    }
+
     if (!size_only) {
       int64_t remaining_output_size = *output_len - uncompressed_total_len;
       DCHECK_GE(remaining_output_size, uncompressed_block_len);
@@ -442,23 +464,29 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       input_len -= sizeof(uint32_t);
 
       if (compressed_len == 0 || compressed_len > input_len) {
-        *output_len = 0;
-        return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
+        if (uncompressed_total_len == 0) {
+          return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
+        }
+        input_len = 0;
+        break;
       }
 
       // Read how big the output will be.
       size_t uncompressed_len;
       if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
-              compressed_len, &uncompressed_len)) {
-        *output_len = 0;
-        return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+            input_len, &uncompressed_len)) {
+        if (uncompressed_total_len == 0) {
+          return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+        }
+        input_len = 0;
+        break;
       }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {
         // Decompress this snappy block
         if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
-                compressed_len, output)) {
+              compressed_len, output)) {
           return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
         }
         output += uncompressed_len;
@@ -498,6 +526,14 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
     *output = out_buffer_;
   }
 
+  if (*output_len > MAX_BLOCK_SIZE) {
+    // TODO: is this check really robust?
+    stringstream ss;
+    ss << "Decompressor: block size is too big.  Data is likely corrupt. "
+       << "Size: " << *output_len;
+    return Status(ss.str());
+  }
+
   char* out_ptr = reinterpret_cast<char*>(*output);
   RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
   return Status::OK();
@@ -511,7 +547,7 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
   DCHECK(input != NULL);
   size_t result;
   if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
-          input_len, &result)) {
+           input_len, &result)) {
     return -1;
   }
   return result;
@@ -524,6 +560,9 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
     if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed");
     if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
       buffer_length_ = uncompressed_length;
+      if (buffer_length_ > MAX_BLOCK_SIZE) {
+        return Status("Decompressor: block size is too big");
+      }
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == NULL)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
@@ -537,7 +576,7 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
   }
 
   if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
-          static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
+           static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
     return Status("Snappy: RawUncompress failed");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 26ea78d..7815233 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -274,9 +274,6 @@ error_codes = (
 
   ("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
    "data of type $1: $2"),
-
-  ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
-   "supported length of 2147483647 bytes.")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/infra/python/bootstrap_virtualenv.py
----------------------------------------------------------------------
diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py
index eb53ba2..a9c6264 100644
--- a/infra/python/bootstrap_virtualenv.py
+++ b/infra/python/bootstrap_virtualenv.py
@@ -123,16 +123,8 @@ def detect_python_cmd():
 
 
 def install_deps():
-  toolchain_dir = os.environ.get("IMPALA_TOOLCHAIN", "")
-  snappy_version = os.environ.get("IMPALA_SNAPPY_VERSION", "")
-  snappy_dir = toolchain_dir + "/snappy-" + snappy_version
-  lib_path = snappy_dir + "/lib:" + os.environ.get("LD_LIBRARY_PATH", "")
-  include_dir = snappy_dir + "/include"
-  args = ["--global-option", "build_ext", "--global-option", "-L"+ lib_path,
-    "--global-option", "-I" + include_dir, "-r", REQS_PATH]
-
   LOG.info("Installing packages into the virtualenv")
-  exec_pip_install(args)
+  exec_pip_install(["-r", REQS_PATH])
   shutil.copyfile(REQS_PATH, INSTALLED_REQS_PATH)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index a7dffd2..b3ebfb1 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -44,7 +44,6 @@ prettytable == 0.7.2
 psutil == 0.7.1
 pyelftools == 0.23
 pyparsing == 2.0.3
-python-snappy == 0.5.0
 pytest == 2.7.2
   py == 1.4.30
 pytest-random == 0.02

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a07fc367/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index fde7032..15f799c 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -3,9 +3,6 @@
 import os
 import pytest
 import random
-import snappy
-import string
-import struct
 import subprocess
 from os.path import join
 from subprocess import call
@@ -147,23 +144,19 @@ class TestTableWriters(ImpalaTestSuite):
 
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):
-  """
-  Tests that Impala handles compressed files in HDFS larger than 1GB.
-  This test creates a 2GB test data file and loads it into a table.
-  """
+  """ Tests that we gracefully handle when a compressed file in HDFS is larger
+  than 1GB.
+  This test creates a testing data file that is over 1GB and loads it to a table.
+  Then verifies Impala will gracefully fail the query.
+  TODO: Once IMPALA-1619 is fixed, modify the test to test > 2GB file."""
+
   TABLE_NAME = "large_compressed_file"
   TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
-  """
-  Name the file with ".snappy" extension to let scanner treat it as
-  a snappy block compressed file.
-  """
+  """ Name the file with ".snappy" extension to let scanner treat it
+  as a snappy compressed file."""
   FILE_NAME = "largefile.snappy"
-  # Maximum uncompressed size of an outer block in a snappy block compressed file.
-  CHUNK_SIZE = 1024 * 1024 * 1024
-  # Limit the max file size to 2GB or too much memory may be needed when
-  # uncompressing the buffer. 2GB is sufficient to show that we support
-  # size beyond maximum 32-bit signed value.
-  MAX_FILE_SIZE = 2 * CHUNK_SIZE
+  LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+  MAX_FILE_SIZE = 1024 * 1024 * 1024
 
   @classmethod
   def get_workload(self):
@@ -177,59 +170,48 @@ class TestLargeCompressedFile(ImpalaTestSuite):
       pytest.skip("skipping if it's not exhaustive test.")
     cls.TestMatrix.add_constraint(lambda v:
         (v.get_value('table_format').file_format =='text' and
-        v.get_value('table_format').compression_codec == 'snap'))
+        v.get_value('table_format').compression_codec == 'none'))
 
   def teardown_method(self, method):
     self.__drop_test_table()
 
+  def __gen_char_or_num(self):
+    return random.choice(self.LETTERS)
+
   def __generate_file(self, file_name, file_size):
     """Generate file with random data and a specified size."""
-    content = "Lots of content here there here there here there"
-    # Permutate the input buffer.
-    payload = ''
-    for i in range(1024):
-      payload += ''.join(random.sample(content, len(content))) + ','
-
-    compressed_payload = snappy.compress(payload)
-    compressed_size = len(compressed_payload)
-
-    num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
-    num_iterations = self.CHUNK_SIZE / (compressed_size + 4)
-    total_size = num_iterations * len(payload)
-
+    s = ''
+    for j in range(1024):
+      s = s + self.__gen_char_or_num()
     put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
-        stdin=subprocess.PIPE, bufsize=-1)
-    """
-    The layout of a snappy-block compressed file is one or more blocks
-    of the following nested structure:
-    - <big endian 32-bit value encoding the uncompresed size>
-    - one or more blocks of the following structure:
-      - <big endian 32-bit value encoding the compressed size>
-      - <raw bits compressed by snappy algorithm>
-    """
-    for i in range(num_chunks):
-      put.stdin.write(struct.pack('>i', total_size))
-      for j in range(num_iterations):
-        put.stdin.write(struct.pack('>i', compressed_size))
-        put.stdin.write(compressed_payload)
+                           stdin=subprocess.PIPE, bufsize=-1)
+    remain = file_size % 1024
+    for i in range(int(file_size / 1024)):
+      put.stdin.write(s)
+    put.stdin.write(s[0:remain])
     put.stdin.close()
     put.wait()
 
   def test_query_large_file(self, vector):
     self.__create_test_table();
     dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
-    file_size = self.MAX_FILE_SIZE
+    file_size = self.MAX_FILE_SIZE + 1
     self.__generate_file(dst_path, file_size)
     self.client.execute("refresh %s" % self.TABLE_NAME)
 
-    # Query the table
-    result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+    # Query the table and check for expected error.
+    expected_error = 'Requested buffer size %dB > 1GB' % file_size
+    try:
+      result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+      assert False, "Query was expected to fail"
+    except Exception as e:
+      error_msg = str(e)
+      assert expected_error in error_msg
 
   def __create_test_table(self):
     self.__drop_test_table()
-    self.client.execute("CREATE TABLE %s (col string) " \
-        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
-        % (self.TABLE_NAME, self.TABLE_LOCATION))
+    self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'"
+      % (self.TABLE_NAME, self.TABLE_LOCATION))
 
   def __drop_test_table(self):
     self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)