You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/04/12 23:19:07 UTC

[33/50] incubator-impala git commit: IMPALA-2399: Memory limit checks for all scanners.

IMPALA-2399: Memory limit checks for all scanners.

This change replaces all instances of MemPool::Allocate() in
avro, text, hbase scanners with MemPool::TryAllocate().

HdfsAvroScanner::MaterializeTuple() has been converted to return
a boolean in case of memory allocation failure. The codegen'ed
version of MaterializeTuple() will also return a boolean. In the
future, we should consider returning Status directly but that will
be more involved and best left as a separate change.

Change-Id: I3e5a56501967a58513888917db5ce66dec4fb5ce
Reviewed-on: http://gerrit.cloudera.org:8080/2568
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/70502942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/70502942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/70502942

Branch: refs/heads/master
Commit: 7050294215cdd81963aa815f69a573421a05ab3e
Parents: 0c4dc96
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 11 15:32:04 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Apr 12 14:02:35 2016 -0700

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc |   3 +-
 be/src/exec/data-source-scan-node.cc |  16 +++-
 be/src/exec/hbase-table-scanner.cc   |  52 +++++++++---
 be/src/exec/hbase-table-scanner.h    |  14 +--
 be/src/exec/hdfs-avro-scanner-ir.cc  |  36 +++++---
 be/src/exec/hdfs-avro-scanner.cc     | 136 +++++++++++++++++++++---------
 be/src/exec/hdfs-avro-scanner.h      |  36 +++++---
 be/src/exec/hdfs-parquet-scanner.cc  |  11 +--
 be/src/exec/hdfs-scanner.cc          |   8 +-
 be/src/exec/hdfs-sequence-scanner.cc |   4 +
 be/src/exec/hdfs-text-scanner.cc     |  21 +++--
 be/src/exec/hdfs-text-scanner.h      |  11 +--
 be/src/exec/scanner-context.cc       |  10 +--
 be/src/exec/text-converter.h         |   2 +-
 be/src/exec/text-converter.inline.h  |  15 +++-
 be/src/runtime/string-buffer.h       |  38 ++++++---
 16 files changed, 285 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9145403..f88717f 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -98,7 +98,8 @@ void BaseSequenceScanner::Close() {
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
-  if (!only_parsing_header_) {
+  // 'header_' can be NULL if HdfsScanNode::CreateAndPrepareScanner() failed.
+  if (!only_parsing_header_ && header_ != NULL) {
     scan_node_->RangeComplete(file_format(), header_->compression_type);
   }
   HdfsScanner::Close();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index c865234..2c52c11 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -55,6 +55,8 @@ const string ERROR_INVALID_TIMESTAMP = "Data source returned invalid timestamp d
     "This likely indicates a problem with the data source library.";
 const string ERROR_INVALID_DECIMAL = "Data source returned invalid decimal data. "
     "This likely indicates a problem with the data source library.";
+const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allocate "
+    "$1 bytes for $2.";
 
 // Size of an encoded TIMESTAMP
 const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t);
@@ -210,7 +212,12 @@ Status DataSourceScanNode::MaterializeNextRow(MemPool* tuple_pool) {
           }
           const string& val = col.string_vals[val_idx];
           size_t val_size = val.size();
-          char* buffer = reinterpret_cast<char*>(tuple_pool->Allocate(val_size));
+          char* buffer = reinterpret_cast<char*>(tuple_pool->TryAllocate(val_size));
+          if (UNLIKELY(buffer == NULL)) {
+            string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow",
+                val_size, "string slot");
+            return tuple_pool->mem_tracker()->MemLimitExceeded(NULL, details, val_size);
+          }
           memcpy(buffer, val.data(), val_size);
           reinterpret_cast<StringValue*>(slot)->ptr = buffer;
           reinterpret_cast<StringValue*>(slot)->len = val_size;
@@ -300,7 +307,12 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
   // create new tuple buffer for row_batch
   MemPool* tuple_pool = row_batch->tuple_data_pool();
   int tuple_buffer_size = row_batch->MaxTupleBufferSize();
-  void* tuple_buffer = tuple_pool->Allocate(tuple_buffer_size);
+  void* tuple_buffer = tuple_pool->TryAllocate(tuple_buffer_size);
+  if (UNLIKELY(tuple_buffer == NULL)) {
+    string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "GetNext",
+        tuple_buffer_size, "tuple");
+    return tuple_pool->mem_tracker()->MemLimitExceeded(state, details, tuple_buffer_size);
+  }
   tuple_ = reinterpret_cast<Tuple*>(tuple_buffer);
   ExprContext** ctxs = &conjunct_ctxs_[0];
   int num_ctxs = conjunct_ctxs_.size();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc
index 93cad54..cd622bc 100644
--- a/be/src/exec/hbase-table-scanner.cc
+++ b/be/src/exec/hbase-table-scanner.cc
@@ -28,6 +28,7 @@
 #include "common/names.h"
 
 using namespace impala;
+using namespace strings;
 
 jclass HBaseTableScanner::scan_cl_ = NULL;
 jclass HBaseTableScanner::resultscanner_cl_ = NULL;
@@ -70,6 +71,9 @@ jobject HBaseTableScanner::empty_row_ = NULL;
 jobject HBaseTableScanner::must_pass_all_op_ = NULL;
 jobjectArray HBaseTableScanner::compare_ops_ = NULL;
 
+const string HBASE_MEM_LIMIT_EXCEEDED = "HBaseTableScanner::$0() failed to "
+    "allocate $1 bytes for $2.";
+
 void HBaseTableScanner::ScanRange::DebugString(int indentation_level,
     stringstream* out) {
   *out << string(indentation_level * 2, ' ');
@@ -573,53 +577,77 @@ inline void HBaseTableScanner::WriteTupleSlot(const SlotDescriptor* slot_desc,
   BitUtil::ByteSwap(slot, data, slot_desc->type().GetByteSize());
 }
 
-inline void HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
     void** data, int* length) {
   int offset = env->CallIntMethod(cell, cell_get_row_offset_id_);
   *length = env->CallShortMethod(cell, cell_get_row_length_id_);
   jbyteArray jdata =
       (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
-  *data = value_pool_->Allocate(*length);
+  *data = value_pool_->TryAllocate(*length);
+  if (UNLIKELY(*data == NULL)) {
+    string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetRowKey",
+        *length, "row array");
+    return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+  }
   env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
   COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+  return Status::OK();
 }
 
-inline void HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
     void** data, int* length) {
   int offset = env->CallIntMethod(cell, cell_get_family_offset_id_);
   *length = env->CallShortMethod(cell, cell_get_family_length_id_);
   jbyteArray jdata =
       (jbyteArray) env->CallObjectMethod(cell, cell_get_family_array_);
-  *data = value_pool_->Allocate(*length);
+  *data = value_pool_->TryAllocate(*length);
+  if (UNLIKELY(*data == NULL)) {
+    string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetFamily",
+        *length, "family array");
+    return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+  }
   env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
   COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+  return Status::OK();
 }
 
-inline void HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
     void** data, int* length) {
   int offset = env->CallIntMethod(cell, cell_get_qualifier_offset_id_);
   *length = env->CallIntMethod(cell, cell_get_qualifier_length_id_);
   jbyteArray jdata =
       (jbyteArray) env->CallObjectMethod(cell, cell_get_qualifier_array_);
-  *data = value_pool_->Allocate(*length);
+  *data = value_pool_->TryAllocate(*length);
+  if (UNLIKELY(*data == NULL)) {
+    string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetQualifier",
+        *length, "qualifier array");
+    return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+  }
   env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
   COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+  return Status::OK();
 }
 
-inline void HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
     void** data, int* length) {
   int offset = env->CallIntMethod(cell, cell_get_value_offset_id_);
   *length = env->CallIntMethod(cell, cell_get_value_length_id_);
   jbyteArray jdata =
       (jbyteArray) env->CallObjectMethod(cell, cell_get_value_array_);
-  *data = value_pool_->Allocate(*length);
+  *data = value_pool_->TryAllocate(*length);
+  if (UNLIKELY(*data == NULL)) {
+    string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetValue",
+        *length, "value array");
+    return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+  }
   env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
   COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+  return Status::OK();
 }
 
 Status HBaseTableScanner::GetRowKey(JNIEnv* env, void** key, int* key_length) {
   jobject cell = env->GetObjectArrayElement(cells_, 0);
-  GetRowKey(env, cell, key, key_length);
+  RETURN_IF_ERROR(GetRowKey(env, cell, key, key_length));
   RETURN_ERROR_IF_EXC(env);
   return Status::OK();
 }
@@ -650,7 +678,7 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
     // Check family. If it doesn't match, we have a NULL value.
     void* family_data;
     int family_length;
-    GetFamily(env, cell, &family_data, &family_length);
+    RETURN_IF_ERROR(GetFamily(env, cell, &family_data, &family_length));
     if (CompareStrings(family, family_data, family_length) != 0) {
       *is_null = true;
       return Status::OK();
@@ -659,13 +687,13 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
     // Check qualifier. If it doesn't match, we have a NULL value.
     void* qualifier_data;
     int qualifier_length;
-    GetQualifier(env, cell, &qualifier_data, &qualifier_length);
+    RETURN_IF_ERROR(GetQualifier(env, cell, &qualifier_data, &qualifier_length));
     if (CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) {
       *is_null = true;
       return Status::OK();
     }
   }
-  GetValue(env, cell, data, length);
+  RETURN_IF_ERROR(GetValue(env, cell, data, length));
   *is_null = false;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h
index 994a036..52e5da7 100644
--- a/be/src/exec/hbase-table-scanner.h
+++ b/be/src/exec/hbase-table-scanner.h
@@ -271,18 +271,20 @@ class HBaseTableScanner {
   Status InitScanRange(JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes);
 
   /// Copies the row key of cell into value_pool_ and returns it via *data and *length.
-  inline void GetRowKey(JNIEnv* env, jobject cell, void** data, int* length);
+  /// Returns error status if memory limit is exceeded.
+  inline Status GetRowKey(JNIEnv* env, jobject cell, void** data, int* length);
 
   /// Copies the column family of cell into value_pool_ and returns it
-  /// via *data and *length.
-  inline void GetFamily(JNIEnv* env, jobject cell, void** data, int* length);
+  /// via *data and *length. Returns error status if memory limit is exceeded.
+  inline Status GetFamily(JNIEnv* env, jobject cell, void** data, int* length);
 
   /// Copies the column qualifier of cell into value_pool_ and returns it
-  /// via *data and *length.
-  inline void GetQualifier(JNIEnv* env, jobject cell, void** data, int* length);
+  /// via *data and *length. Returns error status if memory limit is exceeded.
+  inline Status GetQualifier(JNIEnv* env, jobject cell, void** data, int* length);
 
   /// Copies the value of cell into value_pool_ and returns it via *data and *length.
-  inline void GetValue(JNIEnv* env, jobject cell, void** data, int* length);
+  /// Returns error status if memory limit is exceeded.
+  inline Status GetValue(JNIEnv* env, jobject cell, void** data, int* length);
 
   /// Returns the current value of cells_[cell_index_] in *data and *length
   /// if its family/qualifier match the given family/qualifier.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index a84fe7b..f3ebb97 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -19,15 +19,18 @@
 #include "runtime/string-value.inline.h"
 
 using namespace impala;
+using namespace strings;
 
 // Functions in this file are cross-compiled to IR with clang.
 
 int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
-                                    Tuple* tuple, TupleRow* tuple_row) {
+    Tuple* tuple, TupleRow* tuple_row) {
   int num_to_commit = 0;
   for (int i = 0; i < max_tuples; ++i) {
     InitTuple(template_tuple_, tuple);
-    MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple);
+    if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple))) {
+      return 0;
+    }
     tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
     if (EvalConjuncts(tuple_row)) {
       ++num_to_commit;
@@ -50,7 +53,7 @@ bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) {
 }
 
 void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool write_slot,
-                                      void* slot, MemPool* pool) {
+    void* slot, MemPool* pool) {
   if (write_slot) {
     DCHECK_EQ(type, TYPE_BOOLEAN);
     *reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data);
@@ -59,7 +62,7 @@ void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool w
 }
 
 void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool write_slot,
-                                    void* slot, MemPool* pool) {
+    void* slot, MemPool* pool) {
   int32_t val = ReadWriteUtil::ReadZInt(data);
   if (write_slot) {
     if (type == TYPE_INT) {
@@ -77,7 +80,7 @@ void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool wri
 }
 
 void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool write_slot,
-                                    void* slot, MemPool* pool) {
+    void* slot, MemPool* pool) {
   int64_t val = ReadWriteUtil::ReadZLong(data);
   if (write_slot) {
     if (type == TYPE_BIGINT) {
@@ -93,7 +96,7 @@ void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool wri
 }
 
 void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool write_slot,
-                                    void* slot, MemPool* pool) {
+    void* slot, MemPool* pool) {
   if (write_slot) {
     float val = *reinterpret_cast<float*>(*data);
     if (type == TYPE_FLOAT) {
@@ -108,7 +111,7 @@ void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool wri
 }
 
 void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool write_slot,
-                                     void* slot, MemPool* pool) {
+    void* slot, MemPool* pool) {
   if (write_slot) {
     DCHECK_EQ(type, TYPE_DOUBLE);
     *reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data);
@@ -117,7 +120,7 @@ void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool wr
 }
 
 void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data,
-                                     bool write_slot, void* slot, MemPool* pool) {
+    bool write_slot, void* slot, MemPool* pool) {
   int64_t len = ReadWriteUtil::ReadZLong(data);
   if (write_slot) {
     DCHECK(type == TYPE_VARCHAR);
@@ -130,8 +133,8 @@ void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t**
   *data += len;
 }
 
-void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data,
-                                   bool write_slot, void* slot, MemPool* pool) {
+bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data,
+    bool write_slot, void* slot, MemPool* pool) {
   int64_t len = ReadWriteUtil::ReadZLong(data);
   if (write_slot) {
     DCHECK(type == TYPE_CHAR);
@@ -139,7 +142,13 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da
     int str_len = std::min(static_cast<int>(len), max_len);
     if (ctype.IsVarLenStringType()) {
       StringValue* sv = reinterpret_cast<StringValue*>(slot);
-      sv->ptr = reinterpret_cast<char*>(pool->Allocate(max_len));
+      sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len));
+      if (UNLIKELY(sv->ptr == NULL)) {
+        string details = Substitute("HdfsAvroScanner::ReadAvroChar() failed to allocate"
+            "$0 bytes for char slot.", max_len);
+        parse_status_ = pool->mem_tracker()->MemLimitExceeded(state_, details, max_len);
+        return false;
+      }
       sv->len = max_len;
       memcpy(sv->ptr, *data, str_len);
       StringValue::PadWithSpaces(sv->ptr, max_len, str_len);
@@ -149,10 +158,11 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da
     }
   }
   *data += len;
+  return true;
 }
 
 void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
-                                     bool write_slot, void* slot, MemPool* pool) {
+    bool write_slot, void* slot, MemPool* pool) {
   int64_t len = ReadWriteUtil::ReadZLong(data);
   if (write_slot) {
     DCHECK(type == TYPE_STRING);
@@ -164,7 +174,7 @@ void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
 }
 
 void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
-                                      bool write_slot, void* slot, MemPool* pool) {
+    bool write_slot, void* slot, MemPool* pool) {
   int64_t len = ReadWriteUtil::ReadZLong(data);
   if (write_slot) {
     // Decimals are encoded as big-endian integers. Copy the decimal into the most

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index ddcec6b..060ec05 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -47,6 +47,9 @@ const string HdfsAvroScanner::AVRO_NULL_CODEC("null");
 const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy");
 const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate");
 
+const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate "
+    "$1 bytes for $2.";
+
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
 HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state)
@@ -496,6 +499,7 @@ Status HdfsAvroScanner::ProcessRange() {
           num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row);
         }
       }
+      RETURN_IF_ERROR(parse_status_);
       RETURN_IF_ERROR(CommitRows(num_to_commit));
       num_records -= max_tuples;
       COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
@@ -512,7 +516,7 @@ Status HdfsAvroScanner::ProcessRange() {
   return Status::OK();
 }
 
-void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
+bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
     MemPool* pool, uint8_t** data, Tuple* tuple) {
   DCHECK_EQ(record_schema.schema->type, AVRO_RECORD);
   BOOST_FOREACH(const AvroSchemaElement& element, record_schema.children) {
@@ -555,7 +559,11 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
         if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
           ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
         } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
-          ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
+          if (UNLIKELY(!ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot,
+                           slot, pool))) {
+            DCHECK(!parse_status_.ok());
+            return false;
+          }
         } else {
           ReadAvroString(slot_type, data, write_slot, slot, pool);
         }
@@ -576,53 +584,86 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
         DCHECK(false) << "Unsupported SchemaElement: " << type;
     }
   }
+  return true;
 }
 
 // This function produces a codegen'd function equivalent to MaterializeTuple() but
 // optimized for the table schema. Via helper functions CodegenReadRecord() and
 // CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the
 // type of each element in the schema, instead generating code to handle each element in
-// the schema. Example output:
+// the schema. Example output with tpch.region:
 //
-// define void @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
+// define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
+//     %"struct.impala::AvroSchemaElement"* %record_schema,
 //     %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* %tuple) {
 // entry:
-//   %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32 }*
+//   %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32,
+//       %"struct.impala::StringValue", %"struct.impala::StringValue" }*
 //   %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
 //       %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
 //   br i1 %is_not_null, label %read_field, label %null_field
 //
 // read_field:                                       ; preds = %entry
-//   %slot = getelementptr inbounds { i8, i32 }* %tuple_ptr, i32 0, i32 1
+//   %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+//       %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 1
 //   %opaque_slot = bitcast i32* %slot to i8*
 //   call void
-//    @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
-//        %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data,
+//    @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+//        %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i1 true,
 //        i8* %opaque_slot, %"class.impala::MemPool"* %pool)
-//   br label %endif
+//   br label %end_field
 //
 // null_field:                                       ; preds = %entry
-//   call void @SetNull({ i8, i32 }* %tuple_ptr)
-//   br label %endif
+//   call void @SetNull({ i8, i32, %"struct.impala::StringValue",
+//       %"struct.impala::StringValue" }* %tuple_ptr)
+//   br label %end_field
+//
+// end_field:                                        ; preds = %read_field, %null_field
+//  %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
+//      %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
+//  br i1 %is_not_null4, label %read_field1, label %null_field3
 //
-// endif:                                            ; preds = %null_field, %read_field
-//   %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
+// read_field1:                                      ; preds = %end_field
+//  %slot5 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+//      %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2
+//  %opaque_slot6 = bitcast %"struct.impala::StringValue"* %slot5 to i8*
+//  call void
+//   @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+//       %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
+//       i8* %opaque_slot6, %"class.impala::MemPool"* %pool)
+//  br label %end_field2
+//
+// null_field3:                                      ; preds = %end_field
+//   call void @SetNull1({ i8, i32, %"struct.impala::StringValue",
+//       %"struct.impala::StringValue" }* %tuple_ptr)
+//   br label %end_field2
+//
+// end_field2:                                       ; preds = %read_field1, %null_field3
+//   %is_not_null10 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
 //       %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
-//   br i1 %is_not_null4, label %read_field1, label %null_field2
+//   br i1 %is_not_null10, label %read_field7, label %null_field9
 //
-// read_field1:                                      ; preds = %endif
+// read_field7:                                      ; preds = %end_field2
+//   %slot11 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+//       %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3
+//   %opaque_slot12 = bitcast %"struct.impala::StringValue"* %slot11 to i8*
 //   call void
-//    @_ZN6impala15HdfsAvroScanner15ReadAvroBooleanENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
-//        %"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data,
-//        i8* null, %"class.impala::MemPool"* %pool)
-//   br label %endif3
+//    @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+//        %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
+//        i8* %opaque_slot12, %"class.impala::MemPool"* %pool)
+//   br label %end_field8
+//
+// null_field9:                                      ; preds = %end_field2
+//   call void @SetNull2({ i8, i32, %"struct.impala::StringValue",
+//       %"struct.impala::StringValue" }* %tuple_ptr)
+//   br label %end_field8
 //
-// null_field2:                                      ; preds = %endif
-//   br label %endif3
+// end_field8:                                       ; preds = %read_field7, %null_field9
+//   ret i1 true
 //
-// endif3:                                           ; preds = %null_field2, %read_field1
-//   ret void
-// }
+// bail_out:                                         ; No predecessors!
+//   ret i1 false                                    // used only if there is CHAR.
+//}
 Function* HdfsAvroScanner::CodegenMaterializeTuple(
     HdfsScanNode* node, LlvmCodeGen* codegen) {
   LLVMContext& context = codegen->context();
@@ -644,7 +685,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
   Type* mempool_type = PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0);
   Type* schema_element_type = codegen->GetPtrType(AvroSchemaElement::LLVM_CLASS_NAME);
 
-  LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->void_type());
+  LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->boolean_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
@@ -661,24 +702,33 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
 
   Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
 
+  // Create a bail out block to handle decoding failures.
+  BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, NULL);
+
   Status status = CodegenReadRecord(
-      SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, NULL, this_val,
-      pool_val, tuple_val, data_val);
+      SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block,
+      bail_out_block, this_val, pool_val, tuple_val, data_val);
   if (!status.ok()) {
     VLOG_QUERY << status.GetDetail();
     fn->eraseFromParent();
     return NULL;
   }
 
-  builder.SetInsertPoint(&fn->back());
-  builder.CreateRetVoid();
+  // Returns true on successful decoding.
+  builder.CreateRet(codegen->true_value());
+
+  // Returns false on decoding errors.
+  builder.SetInsertPoint(bail_out_block);
+  builder.CreateRet(codegen->false_value());
+
   return codegen->FinalizeFunction(fn);
 }
 
 Status HdfsAvroScanner::CodegenReadRecord(
     const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
     LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
-    Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val) {
+    BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
+    Value* data_val) {
   DCHECK_EQ(record.schema->type, AVRO_RECORD);
   LLVMContext& context = codegen->context();
   LlvmCodeGen::LlvmBuilder* builder =
@@ -742,21 +792,22 @@ Status HdfsAvroScanner::CodegenReadRecord(
       BasicBlock* insert_before_block =
           (null_block != NULL) ? null_block : end_field_block;
       RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn,
-          insert_before_block, this_val, pool_val, tuple_val, data_val));
+          insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val));
     } else {
-      RETURN_IF_ERROR(CodegenReadScalar(
-          *field, slot_desc, codegen, builder, this_val, pool_val, tuple_val, data_val));
+      RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
+          end_field_block, bail_out, this_val, pool_val, tuple_val, data_val));
     }
     builder->CreateBr(end_field_block);
 
-    // Set insertion point for next field
+    // Set insertion point for next field.
     builder->SetInsertPoint(end_field_block);
   }
   return Status::OK();
 }
 
 Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
-    SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, Value* this_val,
+    SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
+    BasicBlock* end_field_block, BasicBlock* bail_out_block, Value* this_val,
     Value* pool_val, Value* tuple_val, Value* data_val) {
   LlvmCodeGen::LlvmBuilder* builder =
       reinterpret_cast<LlvmCodeGen::LlvmBuilder*>(void_builder);
@@ -781,10 +832,13 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
     case AVRO_BYTES:
       if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
         read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false);
+      } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
+        read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false);
       } else {
         read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false);
       }
       break;
+    // TODO: Add AVRO_DECIMAL here.
     default:
       return Status(Substitute(
           "Failed to codegen MaterializeTuple() due to unsupported type: $0",
@@ -811,13 +865,19 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
   }
 
   // NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
-  if ((slot_desc != NULL) &&
+  if (slot_desc != NULL &&
       (slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) {
-    // Need to pass an extra argument (the length) to the codegen function
+    // Need to pass an extra argument (the length) to the codegen function.
     Value* fixed_len = builder->getInt32(slot_desc->type().len);
     Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
                                 write_slot_val, opaque_slot_val, pool_val};
-    builder->CreateCall(read_field_fn, read_field_args);
+    if (slot_desc->type().type == TYPE_VARCHAR) {
+      builder->CreateCall(read_field_fn, read_field_args);
+    } else {
+      // ReadAvroChar() returns false if allocation from MemPool fails.
+      Value* ret_val = builder->CreateCall(read_field_fn, read_field_args);
+      builder->CreateCondBr(ret_val, end_field_block, bail_out_block);
+    }
   } else {
     Value* read_field_args[] =
         {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val};

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 15e046f..682ba04 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -176,8 +176,10 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
       Tuple* tuple, TupleRow* tuple_row);
 
-  /// Materializes a single tuple from serialized record data.
-  void MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
+  /// Materializes a single tuple from serialized record data. Will return false and set
+  /// error in parse_status_ if memory limit is exceeded when allocating new char buffer.
+  /// See comments below for ReadAvroChar().
+  bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
       uint8_t** data, Tuple* tuple);
 
   /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
@@ -190,7 +192,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// schema.
   /// TODO: Codegen a function for each unique file schema.
   static llvm::Function* CodegenMaterializeTuple(HdfsScanNode* node,
-                                                 LlvmCodeGen* codegen);
+      LlvmCodeGen* codegen);
 
   /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
   /// record.
@@ -200,21 +202,23 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// - builder: used to insert the IR, starting at the current insert point. The insert
   ///     point will be left at the end of the record but before the 'insert_before'
   ///     block.
-  /// - insert_before: the block to insert any new blocks directly before. NULL if blocks
-  ///     should be inserted at the end of fn. (This could theoretically be inferred from
-  ///     builder's insert point, but I can't figure out how to get the successor to a
-  ///     basic block.)
+  /// - insert_before: the block to insert any new blocks directly before. This is either
+  ///     the bail_out block or some basic blocks before that.
+  /// - bail_out: the block to jump to if anything fails. This is used in particular by
+  ///     ReadAvroChar() which can exceed memory limit during allocation from MemPool.
   /// - this_val, pool_val, tuple_val, data_val: arguments to MaterializeTuple()
   static Status CodegenReadRecord(
       const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
       LlvmCodeGen* codegen, void* builder, llvm::Function* fn,
-      llvm::BasicBlock* insert_before, llvm::Value* this_val, llvm::Value* pool_val,
-      llvm::Value* tuple_val, llvm::Value* data_val);
+      llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val,
+      llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val);
 
   /// Creates the IR for reading an Avro scalar at builder's current insert point.
   static Status CodegenReadScalar(const AvroSchemaElement& element,
-    SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* builder, llvm::Value* this_val,
-    llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val);
+      SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
+      llvm::BasicBlock* end_field_block, llvm::BasicBlock* bail_out_block,
+      llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
+      llvm::Value* data_val);
 
   /// The following are cross-compiled functions for parsing a serialized Avro primitive
   /// type and writing it to a slot. They can also be used for skipping a field without
@@ -225,6 +229,10 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// - type: The type of the slot. (This is necessary because there is not a 1:1 mapping
   ///         between Avro types and Impala's primitive types.)
   /// - pool: MemPool for string data.
+  ///
+  /// ReadAvroChar() will return false and set error in parse_status_ if memory limit
+  /// is exceeded when allocating the new char buffer. It returns true otherwise.
+  ///
   void ReadAvroBoolean(
       PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool);
   void ReadAvroInt32(
@@ -238,11 +246,11 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   void ReadAvroVarchar(
       PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot,
       MemPool* pool);
-  void ReadAvroChar(
+  bool ReadAvroChar(
       PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot,
       MemPool* pool);
-  void ReadAvroString( PrimitiveType type, uint8_t** data, bool write_slot, void* slot,
-      MemPool* pool);
+  void ReadAvroString(
+      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool);
 
   /// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or
   /// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is passed

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ad8e360..a910243 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -176,7 +176,8 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 
 namespace impala {
 
-const string PARQUET_MEM_LIMIT_EXCEEDED = "$0 failed to allocate $1 bytes for $2.";
+const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
+    "$1 bytes for $2.";
 
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state)
     : HdfsScanner(scan_node, state),
@@ -750,7 +751,7 @@ bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot(
   if (slot_desc()->type().IsVarLenStringType()) {
     sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
     if (UNLIKELY(sv.ptr == NULL)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot()",
+      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
           len, "StringValue");
       parent_->parse_status_ =
           pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
@@ -1132,7 +1133,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
       if (decompressor_.get() != NULL) {
         dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
         if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
               uncompressed_size, "dictionary");
           return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
               parent_->state_, details, uncompressed_size);
@@ -1147,7 +1148,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
         // more data) to a new buffer
         dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
         if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
               data_size, "dictionary");
           return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
               parent_->state_, details, data_size);
@@ -1184,7 +1185,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
       uint8_t* decompressed_buffer =
           decompressed_data_pool_->TryAllocate(uncompressed_size);
       if (UNLIKELY(decompressed_buffer == NULL)) {
-        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
             uncompressed_size, "decompressed data");
         return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
             parent_->state_, details, uncompressed_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 7c0fd9a..a26a575 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -115,8 +115,8 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
     THdfsFileFormat::type type, const string& scanner_name) {
   if (!scan_node_->tuple_desc()->string_slots().empty()
       && partition->escape_char() != '\0') {
-    // Cannot use codegen if there are strings slots and we need to
-    // compact (i.e. copy) the data.
+    // Codegen currently doesn't emit call to MemPool::TryAllocate() so skip codegen if
+    // there are strings slots and we need to compact (i.e. copy) the data.
     scan_node_->IncNumScannersCodegenDisabled();
     return Status::OK();
   }
@@ -181,8 +181,8 @@ Status HdfsScanner::CommitRows(int num_rows) {
     RETURN_IF_ERROR(StartNewRowBatch());
   }
   if (context_->cancelled()) return Status::CANCELLED;
-  // TODO: Replace with GetQueryStatus().
-  RETURN_IF_ERROR(state_->CheckQueryState());
+  // Check for UDF errors.
+  RETURN_IF_ERROR(state_->GetQueryStatus());
   // Free local expr allocations for this thread
   HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
   for (; iter != scanner_conjuncts_map_.end(); ++iter) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 3bed4e3..168cdd1 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -252,6 +252,10 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
   // Call jitted function if possible
   int tuples_returned;
   if (write_tuples_fn_ != NULL) {
+    // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots
+    // and escape characters. TextConverter::WriteSlot() will be used instead.
+    DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
+        delimited_text_parser_->escape_char() == '\0');
     // last argument: seq always starts at record_location[0]
     tuples_returned = write_tuples_fn_(this, pool, tuple_row,
         batch_->row_byte_size(), &field_locations_[0], num_to_process,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 69abebc..9572260 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -35,6 +35,7 @@ using boost::algorithm::ends_with;
 using boost::algorithm::to_lower;
 using namespace impala;
 using namespace llvm;
+using namespace strings;
 
 const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
 
@@ -359,7 +360,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
       // There can be one partial tuple which returned no more fields from this buffer.
       DCHECK_LE(*num_tuples, num_fields + 1);
       if (!boundary_column_.Empty()) {
-        CopyBoundaryField(&field_locations_[0], pool);
+        RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
         boundary_column_.Clear();
       }
       num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, *num_tuples);
@@ -379,14 +380,14 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
     // Save contents that are split across buffers if we are going to return this column
     if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) {
       DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
-      boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start);
+      RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start));
       char* last_row = NULL;
       if (*num_tuples == 0) {
         last_row = batch_start_ptr_;
       } else {
         last_row = row_end_locations_[*num_tuples - 1] + 1;
       }
-      boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row);
+      RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row));
     }
     COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
 
@@ -718,6 +719,10 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
     int tuples_returned = 0;
     // Call jitted function if possible
     if (write_tuples_fn_ != NULL) {
+      // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string
+      // slots and escape characters. TextConverter::WriteSlot() will be used instead.
+      DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
+          delimited_text_parser_->escape_char() == '\0');
       tuples_returned = write_tuples_fn_(this, pool, tuple_row,
           batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
           scan_node_->materialized_slots().size(), num_tuples_processed);
@@ -751,15 +756,21 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
   return num_tuples_materialized;
 }
 
-void HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
+Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
   bool needs_escape = data->len < 0;
   int copy_len = needs_escape ? -data->len : data->len;
   int total_len = copy_len + boundary_column_.Size();
-  char* str_data = reinterpret_cast<char*>(pool->Allocate(total_len));
+  char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
+  if (UNLIKELY(str_data == NULL)) {
+    string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
+        "$0 bytes.", total_len);
+    return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
+  }
   memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
   memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
   data->start = str_data;
   data->len = needs_escape ? -total_len : total_len;
+  return Status::OK();
 }
 
 int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 50758ae..804c9c3 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -116,11 +116,12 @@ class HdfsTextScanner : public HdfsScanner {
       int64_t* decompressed_len, bool *eosr);
 
   /// Prepends field data that was from the previous file buffer (This field straddled two
-  /// file buffers).  'data' already contains the pointer/len from the current file buffer,
-  /// boundary_column_ contains the beginning of the data from the previous file
-  /// buffer. This function will allocate a new string from the tuple pool, concatenate the
-  /// two pieces and update 'data' to contain the new pointer/len.
-  void CopyBoundaryField(FieldLocation* data, MemPool* pool);
+  /// file buffers). 'data' already contains the pointer/len from the current file buffer,
+  /// boundary_column_ contains the beginning of the data from the previous file buffer.
+  /// This function will allocate a new string from the tuple pool, concatenate the
+  /// two pieces and update 'data' to contain the new pointer/len. Return error status if
+  /// memory limit is exceeded when allocating a new string.
+  Status CopyBoundaryField(FieldLocation* data, MemPool* pool);
 
   /// Writes the intermediate parsed data into slots, outputting
   /// tuples to row_batch as they complete.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 040362b..897f8c5 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -118,7 +118,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
 }
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
-  if (parent_->cancelled()) return Status::CANCELLED;
+  if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
 
   // io_buffer_ should only be null the first time this is called
   DCHECK(io_buffer_ != NULL ||
@@ -178,7 +178,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
   *len = 0;
   if (eosr()) return Status::OK();
 
-  if (parent_->cancelled()) {
+  if (UNLIKELY(parent_->cancelled())) {
     DCHECK(*out_buffer == NULL);
     return Status::CANCELLED;
   }
@@ -243,11 +243,11 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
     // We need to fetch more bytes. Copy the end of the current buffer and fetch the next
     // one.
-    boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_);
+    RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
     boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
 
     RETURN_IF_ERROR(GetNextBuffer());
-    if (parent_->cancelled()) return Status::CANCELLED;
+    if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
 
     if (io_buffer_bytes_left_ == 0) {
       // No more bytes (i.e. EOF)
@@ -267,7 +267,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     output_buffer_pos_ = &io_buffer_pos_;
     output_buffer_bytes_left_ = &io_buffer_bytes_left_;
   } else {
-    boundary_buffer_->Append(io_buffer_pos_, num_bytes);
+    RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
     boundary_buffer_bytes_left_ += num_bytes;
     boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
                            boundary_buffer_->Size() - boundary_buffer_bytes_left_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.h
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.h b/be/src/exec/text-converter.h
index 266e74d..3651146 100644
--- a/be/src/exec/text-converter.h
+++ b/be/src/exec/text-converter.h
@@ -49,7 +49,7 @@ class TextConverter {
   /// and writes the result into the tuples's slot.
   /// copy_string indicates whether we need to make a separate copy of the string data:
   /// For regular unescaped strings, we point to the original data in the file_buf_.
-  /// For regular escaped strings, we copy an its unescaped string into a separate buffer
+  /// For regular escaped strings, we copy its unescaped string into a separate buffer
   /// and point to it.
   /// If the string needs to be copied, the memory is allocated from 'pool', otherwise
   /// 'pool' is unused.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h
index 7b429fa..d50b66f 100644
--- a/be/src/exec/text-converter.inline.h
+++ b/be/src/exec/text-converter.inline.h
@@ -33,7 +33,7 @@
 namespace impala {
 
 /// Note: this function has a codegen'd version.  Changing this function requires
-/// corresponding changes to CodegenWriteSlot.
+/// corresponding changes to CodegenWriteSlot().
 inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple,
     const char* data, int len, bool copy_string, bool need_escape, MemPool* pool) {
   if ((len == 0 && !slot_desc->type().IsStringType()) || data == NULL) {
@@ -59,7 +59,7 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
       if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR) buffer_len = type.len;
 
       bool reuse_data = type.IsVarLenStringType() &&
-                        !(len != 0 && (copy_string || need_escape));
+          !(len != 0 && (copy_string || need_escape));
       if (type.type == TYPE_CHAR) reuse_data &= (buffer_len <= len);
 
       StringValue str;
@@ -67,9 +67,18 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
       if (reuse_data) {
         str.ptr = const_cast<char*>(data);
       } else {
+        // The codegen version of this code (generated by CodegenWriteSlot()) doesn't
+        // include this path. In other words, 'reuse_data' will always be true in the
+        // codegen version:
+        // 1. CodegenWriteSlot() doesn't yet support slot of TYPE_CHAR
+        // 2. HdfsScanner::InitializeWriteTuplesFn() will not codegen if there is
+        //    any escape character.
+        // 3. HdfsScanner::WriteCompleteTuple() always calls this function with
+        //    'copy_string' == false.
         str.ptr = type.IsVarLenStringType() ?
-            reinterpret_cast<char*>(pool->Allocate(buffer_len)) :
+            reinterpret_cast<char*>(pool->TryAllocate(buffer_len)) :
             reinterpret_cast<char*>(slot);
+        if (UNLIKELY(str.ptr == NULL)) return false;
         if (need_escape) {
           UnescapeString(data, str.ptr, &str.len, buffer_len);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 1787b12..c8f79df 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -16,9 +16,12 @@
 #ifndef IMPALA_RUNTIME_STRING_BUFFER_H
 #define IMPALA_RUNTIME_STRING_BUFFER_H
 
+#include "common/status.h"
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
 
+using namespace strings;
+
 namespace impala {
 
 /// Dynamic-sizable string (similar to std::string) but without as many
@@ -30,7 +33,7 @@ namespace impala {
 class StringBuffer {
  public:
   /// C'tor for StringBuffer.  Memory backing the string will be allocated from
-  /// the pool as necessary.  Can optionally be initialized from a StringValue.
+  /// the pool as necessary. Can optionally be initialized from a StringValue.
   StringBuffer(MemPool* pool, StringValue* str = NULL)
       : pool_(pool), buffer_size_(0) {
     DCHECK(pool_ != NULL);
@@ -41,24 +44,24 @@ class StringBuffer {
   }
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
-  void Append(const char* str, int len) {
+  /// Return error status if memory limit is exceeded.
+  Status Append(const char* str, int len) {
     int new_len = len + string_value_.len;
-    if (new_len > buffer_size_) {
-      GrowBuffer(new_len);
-    }
+    if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
     memcpy(string_value_.ptr + string_value_.len, str, len);
     string_value_.len = new_len;
+    return Status::OK();
   }
 
   /// TODO: switch everything to uint8_t?
-  void Append(const uint8_t* str, int len) {
-    Append(reinterpret_cast<const char*>(str), len);
+  Status Append(const uint8_t* str, int len) {
+    return Append(reinterpret_cast<const char*>(str), len);
   }
 
-  /// Assigns contents to StringBuffer
-  void Assign(const char* str, int len) {
+  /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
+  Status Assign(const char* str, int len) {
     Clear();
-    Append(str, len);
+    return Append(str, len);
   }
 
   /// Clear the underlying StringValue.  The allocated buffer can be reused.
@@ -94,16 +97,23 @@ class StringBuffer {
 
  private:
   /// Grows the buffer backing the string to be at least new_size, copying over the
-  /// previous string data into the new buffer.
-  void GrowBuffer(int new_len) {
+  /// previous string data into the new buffer. Return error status if memory limit
+  /// is exceeded.
+  Status GrowBuffer(int new_len) {
     // TODO: Release/reuse old buffers somehow
     buffer_size_ = std::max(buffer_size_ * 2, new_len);
     DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
-    char* new_buffer = reinterpret_cast<char*>(pool_->Allocate(buffer_size_));
-    if (string_value_.len > 0) {
+    char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+    if (UNLIKELY(new_buffer == NULL)) {
+      string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
+          buffer_size_);
+      return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+    }
+    if (LIKELY(string_value_.len > 0)) {
       memcpy(new_buffer, string_value_.ptr, string_value_.len);
     }
     string_value_.ptr = new_buffer;
+    return Status::OK();
   }
 
   MemPool* pool_;