You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/04/12 23:19:07 UTC
[33/50] incubator-impala git commit: IMPALA-2399: Memory limit checks
for all scanners.
IMPALA-2399: Memory limit checks for all scanners.
This change replaces all instances of MemPool::Allocate() in
avro, text, hbase scanners with MemPool::TryAllocate().
HdfsAvroScanner::MaterializeTuple() has been converted to return
a boolean in case of memory allocation failure. The codegen'ed
version of MaterializeTuple() will also return a boolean. In the
future, we should consider returning Status directly but that will
be more involved and best left as a separate change.
Change-Id: I3e5a56501967a58513888917db5ce66dec4fb5ce
Reviewed-on: http://gerrit.cloudera.org:8080/2568
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/70502942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/70502942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/70502942
Branch: refs/heads/master
Commit: 7050294215cdd81963aa815f69a573421a05ab3e
Parents: 0c4dc96
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 11 15:32:04 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Apr 12 14:02:35 2016 -0700
----------------------------------------------------------------------
be/src/exec/base-sequence-scanner.cc | 3 +-
be/src/exec/data-source-scan-node.cc | 16 +++-
be/src/exec/hbase-table-scanner.cc | 52 +++++++++---
be/src/exec/hbase-table-scanner.h | 14 +--
be/src/exec/hdfs-avro-scanner-ir.cc | 36 +++++---
be/src/exec/hdfs-avro-scanner.cc | 136 +++++++++++++++++++++---------
be/src/exec/hdfs-avro-scanner.h | 36 +++++---
be/src/exec/hdfs-parquet-scanner.cc | 11 +--
be/src/exec/hdfs-scanner.cc | 8 +-
be/src/exec/hdfs-sequence-scanner.cc | 4 +
be/src/exec/hdfs-text-scanner.cc | 21 +++--
be/src/exec/hdfs-text-scanner.h | 11 +--
be/src/exec/scanner-context.cc | 10 +--
be/src/exec/text-converter.h | 2 +-
be/src/exec/text-converter.inline.h | 15 +++-
be/src/runtime/string-buffer.h | 38 ++++++---
16 files changed, 285 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9145403..f88717f 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -98,7 +98,8 @@ void BaseSequenceScanner::Close() {
// Verify all resources (if any) have been transferred.
DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
DCHECK_EQ(context_->num_completed_io_buffers(), 0);
- if (!only_parsing_header_) {
+ // 'header_' can be NULL if HdfsScanNode::CreateAndPrepareScanner() failed.
+ if (!only_parsing_header_ && header_ != NULL) {
scan_node_->RangeComplete(file_format(), header_->compression_type);
}
HdfsScanner::Close();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index c865234..2c52c11 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -55,6 +55,8 @@ const string ERROR_INVALID_TIMESTAMP = "Data source returned invalid timestamp d
"This likely indicates a problem with the data source library.";
const string ERROR_INVALID_DECIMAL = "Data source returned invalid decimal data. "
"This likely indicates a problem with the data source library.";
+const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allocate "
+ "$1 bytes for $2.";
// Size of an encoded TIMESTAMP
const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t);
@@ -210,7 +212,12 @@ Status DataSourceScanNode::MaterializeNextRow(MemPool* tuple_pool) {
}
const string& val = col.string_vals[val_idx];
size_t val_size = val.size();
- char* buffer = reinterpret_cast<char*>(tuple_pool->Allocate(val_size));
+ char* buffer = reinterpret_cast<char*>(tuple_pool->TryAllocate(val_size));
+ if (UNLIKELY(buffer == NULL)) {
+ string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow",
+ val_size, "string slot");
+ return tuple_pool->mem_tracker()->MemLimitExceeded(NULL, details, val_size);
+ }
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
reinterpret_cast<StringValue*>(slot)->len = val_size;
@@ -300,7 +307,12 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
// create new tuple buffer for row_batch
MemPool* tuple_pool = row_batch->tuple_data_pool();
int tuple_buffer_size = row_batch->MaxTupleBufferSize();
- void* tuple_buffer = tuple_pool->Allocate(tuple_buffer_size);
+ void* tuple_buffer = tuple_pool->TryAllocate(tuple_buffer_size);
+ if (UNLIKELY(tuple_buffer == NULL)) {
+ string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "GetNext",
+ tuple_buffer_size, "tuple");
+ return tuple_pool->mem_tracker()->MemLimitExceeded(state, details, tuple_buffer_size);
+ }
tuple_ = reinterpret_cast<Tuple*>(tuple_buffer);
ExprContext** ctxs = &conjunct_ctxs_[0];
int num_ctxs = conjunct_ctxs_.size();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc
index 93cad54..cd622bc 100644
--- a/be/src/exec/hbase-table-scanner.cc
+++ b/be/src/exec/hbase-table-scanner.cc
@@ -28,6 +28,7 @@
#include "common/names.h"
using namespace impala;
+using namespace strings;
jclass HBaseTableScanner::scan_cl_ = NULL;
jclass HBaseTableScanner::resultscanner_cl_ = NULL;
@@ -70,6 +71,9 @@ jobject HBaseTableScanner::empty_row_ = NULL;
jobject HBaseTableScanner::must_pass_all_op_ = NULL;
jobjectArray HBaseTableScanner::compare_ops_ = NULL;
+const string HBASE_MEM_LIMIT_EXCEEDED = "HBaseTableScanner::$0() failed to "
+ "allocate $1 bytes for $2.";
+
void HBaseTableScanner::ScanRange::DebugString(int indentation_level,
stringstream* out) {
*out << string(indentation_level * 2, ' ');
@@ -573,53 +577,77 @@ inline void HBaseTableScanner::WriteTupleSlot(const SlotDescriptor* slot_desc,
BitUtil::ByteSwap(slot, data, slot_desc->type().GetByteSize());
}
-inline void HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_row_offset_id_);
*length = env->CallShortMethod(cell, cell_get_row_length_id_);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
- *data = value_pool_->Allocate(*length);
+ *data = value_pool_->TryAllocate(*length);
+ if (UNLIKELY(*data == NULL)) {
+ string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetRowKey",
+ *length, "row array");
+ return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+ }
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+ return Status::OK();
}
-inline void HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_family_offset_id_);
*length = env->CallShortMethod(cell, cell_get_family_length_id_);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_family_array_);
- *data = value_pool_->Allocate(*length);
+ *data = value_pool_->TryAllocate(*length);
+ if (UNLIKELY(*data == NULL)) {
+ string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetFamily",
+ *length, "family array");
+ return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+ }
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+ return Status::OK();
}
-inline void HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_qualifier_offset_id_);
*length = env->CallIntMethod(cell, cell_get_qualifier_length_id_);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_qualifier_array_);
- *data = value_pool_->Allocate(*length);
+ *data = value_pool_->TryAllocate(*length);
+ if (UNLIKELY(*data == NULL)) {
+ string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetQualifier",
+ *length, "qualifier array");
+ return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+ }
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+ return Status::OK();
}
-inline void HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
+inline Status HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_value_offset_id_);
*length = env->CallIntMethod(cell, cell_get_value_length_id_);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_value_array_);
- *data = value_pool_->Allocate(*length);
+ *data = value_pool_->TryAllocate(*length);
+ if (UNLIKELY(*data == NULL)) {
+ string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetValue",
+ *length, "value array");
+ return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
+ }
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
+ return Status::OK();
}
Status HBaseTableScanner::GetRowKey(JNIEnv* env, void** key, int* key_length) {
jobject cell = env->GetObjectArrayElement(cells_, 0);
- GetRowKey(env, cell, key, key_length);
+ RETURN_IF_ERROR(GetRowKey(env, cell, key, key_length));
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
@@ -650,7 +678,7 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
// Check family. If it doesn't match, we have a NULL value.
void* family_data;
int family_length;
- GetFamily(env, cell, &family_data, &family_length);
+ RETURN_IF_ERROR(GetFamily(env, cell, &family_data, &family_length));
if (CompareStrings(family, family_data, family_length) != 0) {
*is_null = true;
return Status::OK();
@@ -659,13 +687,13 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
// Check qualifier. If it doesn't match, we have a NULL value.
void* qualifier_data;
int qualifier_length;
- GetQualifier(env, cell, &qualifier_data, &qualifier_length);
+ RETURN_IF_ERROR(GetQualifier(env, cell, &qualifier_data, &qualifier_length));
if (CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) {
*is_null = true;
return Status::OK();
}
}
- GetValue(env, cell, data, length);
+ RETURN_IF_ERROR(GetValue(env, cell, data, length));
*is_null = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h
index 994a036..52e5da7 100644
--- a/be/src/exec/hbase-table-scanner.h
+++ b/be/src/exec/hbase-table-scanner.h
@@ -271,18 +271,20 @@ class HBaseTableScanner {
Status InitScanRange(JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes);
/// Copies the row key of cell into value_pool_ and returns it via *data and *length.
- inline void GetRowKey(JNIEnv* env, jobject cell, void** data, int* length);
+ /// Returns error status if memory limit is exceeded.
+ inline Status GetRowKey(JNIEnv* env, jobject cell, void** data, int* length);
/// Copies the column family of cell into value_pool_ and returns it
- /// via *data and *length.
- inline void GetFamily(JNIEnv* env, jobject cell, void** data, int* length);
+ /// via *data and *length. Returns error status if memory limit is exceeded.
+ inline Status GetFamily(JNIEnv* env, jobject cell, void** data, int* length);
/// Copies the column qualifier of cell into value_pool_ and returns it
- /// via *data and *length.
- inline void GetQualifier(JNIEnv* env, jobject cell, void** data, int* length);
+ /// via *data and *length. Returns error status if memory limit is exceeded.
+ inline Status GetQualifier(JNIEnv* env, jobject cell, void** data, int* length);
/// Copies the value of cell into value_pool_ and returns it via *data and *length.
- inline void GetValue(JNIEnv* env, jobject cell, void** data, int* length);
+ /// Returns error status if memory limit is exceeded.
+ inline Status GetValue(JNIEnv* env, jobject cell, void** data, int* length);
/// Returns the current value of cells_[cell_index_] in *data and *length
/// if its family/qualifier match the given family/qualifier.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index a84fe7b..f3ebb97 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -19,15 +19,18 @@
#include "runtime/string-value.inline.h"
using namespace impala;
+using namespace strings;
// Functions in this file are cross-compiled to IR with clang.
int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
- Tuple* tuple, TupleRow* tuple_row) {
+ Tuple* tuple, TupleRow* tuple_row) {
int num_to_commit = 0;
for (int i = 0; i < max_tuples; ++i) {
InitTuple(template_tuple_, tuple);
- MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple);
+ if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple))) {
+ return 0;
+ }
tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
if (EvalConjuncts(tuple_row)) {
++num_to_commit;
@@ -50,7 +53,7 @@ bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) {
}
void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool write_slot,
- void* slot, MemPool* pool) {
+ void* slot, MemPool* pool) {
if (write_slot) {
DCHECK_EQ(type, TYPE_BOOLEAN);
*reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data);
@@ -59,7 +62,7 @@ void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool w
}
void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool write_slot,
- void* slot, MemPool* pool) {
+ void* slot, MemPool* pool) {
int32_t val = ReadWriteUtil::ReadZInt(data);
if (write_slot) {
if (type == TYPE_INT) {
@@ -77,7 +80,7 @@ void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool wri
}
void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool write_slot,
- void* slot, MemPool* pool) {
+ void* slot, MemPool* pool) {
int64_t val = ReadWriteUtil::ReadZLong(data);
if (write_slot) {
if (type == TYPE_BIGINT) {
@@ -93,7 +96,7 @@ void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool wri
}
void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool write_slot,
- void* slot, MemPool* pool) {
+ void* slot, MemPool* pool) {
if (write_slot) {
float val = *reinterpret_cast<float*>(*data);
if (type == TYPE_FLOAT) {
@@ -108,7 +111,7 @@ void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool wri
}
void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool write_slot,
- void* slot, MemPool* pool) {
+ void* slot, MemPool* pool) {
if (write_slot) {
DCHECK_EQ(type, TYPE_DOUBLE);
*reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data);
@@ -117,7 +120,7 @@ void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool wr
}
void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data,
- bool write_slot, void* slot, MemPool* pool) {
+ bool write_slot, void* slot, MemPool* pool) {
int64_t len = ReadWriteUtil::ReadZLong(data);
if (write_slot) {
DCHECK(type == TYPE_VARCHAR);
@@ -130,8 +133,8 @@ void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t**
*data += len;
}
-void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data,
- bool write_slot, void* slot, MemPool* pool) {
+bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data,
+ bool write_slot, void* slot, MemPool* pool) {
int64_t len = ReadWriteUtil::ReadZLong(data);
if (write_slot) {
DCHECK(type == TYPE_CHAR);
@@ -139,7 +142,13 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da
int str_len = std::min(static_cast<int>(len), max_len);
if (ctype.IsVarLenStringType()) {
StringValue* sv = reinterpret_cast<StringValue*>(slot);
- sv->ptr = reinterpret_cast<char*>(pool->Allocate(max_len));
+ sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len));
+ if (UNLIKELY(sv->ptr == NULL)) {
+ string details = Substitute("HdfsAvroScanner::ReadAvroChar() failed to allocate"
+ "$0 bytes for char slot.", max_len);
+ parse_status_ = pool->mem_tracker()->MemLimitExceeded(state_, details, max_len);
+ return false;
+ }
sv->len = max_len;
memcpy(sv->ptr, *data, str_len);
StringValue::PadWithSpaces(sv->ptr, max_len, str_len);
@@ -149,10 +158,11 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da
}
}
*data += len;
+ return true;
}
void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
- bool write_slot, void* slot, MemPool* pool) {
+ bool write_slot, void* slot, MemPool* pool) {
int64_t len = ReadWriteUtil::ReadZLong(data);
if (write_slot) {
DCHECK(type == TYPE_STRING);
@@ -164,7 +174,7 @@ void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
}
void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
- bool write_slot, void* slot, MemPool* pool) {
+ bool write_slot, void* slot, MemPool* pool) {
int64_t len = ReadWriteUtil::ReadZLong(data);
if (write_slot) {
// Decimals are encoded as big-endian integers. Copy the decimal into the most
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index ddcec6b..060ec05 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -47,6 +47,9 @@ const string HdfsAvroScanner::AVRO_NULL_CODEC("null");
const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy");
const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate");
+const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate "
+ "$1 bytes for $2.";
+
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state)
@@ -496,6 +499,7 @@ Status HdfsAvroScanner::ProcessRange() {
num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row);
}
}
+ RETURN_IF_ERROR(parse_status_);
RETURN_IF_ERROR(CommitRows(num_to_commit));
num_records -= max_tuples;
COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
@@ -512,7 +516,7 @@ Status HdfsAvroScanner::ProcessRange() {
return Status::OK();
}
-void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
+bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
MemPool* pool, uint8_t** data, Tuple* tuple) {
DCHECK_EQ(record_schema.schema->type, AVRO_RECORD);
BOOST_FOREACH(const AvroSchemaElement& element, record_schema.children) {
@@ -555,7 +559,11 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
} else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
- ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
+ if (UNLIKELY(!ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot,
+ slot, pool))) {
+ DCHECK(!parse_status_.ok());
+ return false;
+ }
} else {
ReadAvroString(slot_type, data, write_slot, slot, pool);
}
@@ -576,53 +584,86 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
DCHECK(false) << "Unsupported SchemaElement: " << type;
}
}
+ return true;
}
// This function produces a codegen'd function equivalent to MaterializeTuple() but
// optimized for the table schema. Via helper functions CodegenReadRecord() and
// CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the
// type of each element in the schema, instead generating code to handle each element in
-// the schema. Example output:
+// the schema. Example output with tpch.region:
//
-// define void @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
+// define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
+// %"struct.impala::AvroSchemaElement"* %record_schema,
// %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* %tuple) {
// entry:
-// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32 }*
+// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32,
+// %"struct.impala::StringValue", %"struct.impala::StringValue" }*
// %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
// br i1 %is_not_null, label %read_field, label %null_field
//
// read_field: ; preds = %entry
-// %slot = getelementptr inbounds { i8, i32 }* %tuple_ptr, i32 0, i32 1
+// %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 1
// %opaque_slot = bitcast i32* %slot to i8*
// call void
-// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
-// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data,
+// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i1 true,
// i8* %opaque_slot, %"class.impala::MemPool"* %pool)
-// br label %endif
+// br label %end_field
//
// null_field: ; preds = %entry
-// call void @SetNull({ i8, i32 }* %tuple_ptr)
-// br label %endif
+// call void @SetNull({ i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr)
+// br label %end_field
+//
+// end_field: ; preds = %read_field, %null_field
+// %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
+// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
+// br i1 %is_not_null4, label %read_field1, label %null_field3
//
-// endif: ; preds = %null_field, %read_field
-// %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
+// read_field1: ; preds = %end_field
+// %slot5 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2
+// %opaque_slot6 = bitcast %"struct.impala::StringValue"* %slot5 to i8*
+// call void
+// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
+// i8* %opaque_slot6, %"class.impala::MemPool"* %pool)
+// br label %end_field2
+//
+// null_field3: ; preds = %end_field
+// call void @SetNull1({ i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr)
+// br label %end_field2
+//
+// end_field2: ; preds = %read_field1, %null_field3
+// %is_not_null10 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
-// br i1 %is_not_null4, label %read_field1, label %null_field2
+// br i1 %is_not_null10, label %read_field7, label %null_field9
//
-// read_field1: ; preds = %endif
+// read_field7: ; preds = %end_field2
+// %slot11 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3
+// %opaque_slot12 = bitcast %"struct.impala::StringValue"* %slot11 to i8*
// call void
-// @_ZN6impala15HdfsAvroScanner15ReadAvroBooleanENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
-// %"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data,
-// i8* null, %"class.impala::MemPool"* %pool)
-// br label %endif3
+// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
+// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
+// i8* %opaque_slot12, %"class.impala::MemPool"* %pool)
+// br label %end_field8
+//
+// null_field9: ; preds = %end_field2
+// call void @SetNull2({ i8, i32, %"struct.impala::StringValue",
+// %"struct.impala::StringValue" }* %tuple_ptr)
+// br label %end_field8
//
-// null_field2: ; preds = %endif
-// br label %endif3
+// end_field8: ; preds = %read_field7, %null_field9
+// ret i1 true
//
-// endif3: ; preds = %null_field2, %read_field1
-// ret void
-// }
+// bail_out: ; No predecessors!
+// ret i1 false // used only if there is CHAR.
+//}
Function* HdfsAvroScanner::CodegenMaterializeTuple(
HdfsScanNode* node, LlvmCodeGen* codegen) {
LLVMContext& context = codegen->context();
@@ -644,7 +685,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
Type* mempool_type = PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0);
Type* schema_element_type = codegen->GetPtrType(AvroSchemaElement::LLVM_CLASS_NAME);
- LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->void_type());
+ LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->boolean_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
@@ -661,24 +702,33 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
+ // Create a bail out block to handle decoding failures.
+ BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, NULL);
+
Status status = CodegenReadRecord(
- SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, NULL, this_val,
- pool_val, tuple_val, data_val);
+ SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block,
+ bail_out_block, this_val, pool_val, tuple_val, data_val);
if (!status.ok()) {
VLOG_QUERY << status.GetDetail();
fn->eraseFromParent();
return NULL;
}
- builder.SetInsertPoint(&fn->back());
- builder.CreateRetVoid();
+ // Returns true on successful decoding.
+ builder.CreateRet(codegen->true_value());
+
+ // Returns false on decoding errors.
+ builder.SetInsertPoint(bail_out_block);
+ builder.CreateRet(codegen->false_value());
+
return codegen->FinalizeFunction(fn);
}
Status HdfsAvroScanner::CodegenReadRecord(
const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
- Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val) {
+ BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
+ Value* data_val) {
DCHECK_EQ(record.schema->type, AVRO_RECORD);
LLVMContext& context = codegen->context();
LlvmCodeGen::LlvmBuilder* builder =
@@ -742,21 +792,22 @@ Status HdfsAvroScanner::CodegenReadRecord(
BasicBlock* insert_before_block =
(null_block != NULL) ? null_block : end_field_block;
RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn,
- insert_before_block, this_val, pool_val, tuple_val, data_val));
+ insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val));
} else {
- RETURN_IF_ERROR(CodegenReadScalar(
- *field, slot_desc, codegen, builder, this_val, pool_val, tuple_val, data_val));
+ RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
+ end_field_block, bail_out, this_val, pool_val, tuple_val, data_val));
}
builder->CreateBr(end_field_block);
- // Set insertion point for next field
+ // Set insertion point for next field.
builder->SetInsertPoint(end_field_block);
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
- SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, Value* this_val,
+ SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
+ BasicBlock* end_field_block, BasicBlock* bail_out_block, Value* this_val,
Value* pool_val, Value* tuple_val, Value* data_val) {
LlvmCodeGen::LlvmBuilder* builder =
reinterpret_cast<LlvmCodeGen::LlvmBuilder*>(void_builder);
@@ -781,10 +832,13 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
case AVRO_BYTES:
if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false);
+ } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
+ read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false);
} else {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false);
}
break;
+ // TODO: Add AVRO_DECIMAL here.
default:
return Status(Substitute(
"Failed to codegen MaterializeTuple() due to unsupported type: $0",
@@ -811,13 +865,19 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
}
// NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
- if ((slot_desc != NULL) &&
+ if (slot_desc != NULL &&
(slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) {
- // Need to pass an extra argument (the length) to the codegen function
+ // Need to pass an extra argument (the length) to the codegen function.
Value* fixed_len = builder->getInt32(slot_desc->type().len);
Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
write_slot_val, opaque_slot_val, pool_val};
- builder->CreateCall(read_field_fn, read_field_args);
+ if (slot_desc->type().type == TYPE_VARCHAR) {
+ builder->CreateCall(read_field_fn, read_field_args);
+ } else {
+ // ReadAvroChar() returns false if allocation from MemPool fails.
+ Value* ret_val = builder->CreateCall(read_field_fn, read_field_args);
+ builder->CreateCondBr(ret_val, end_field_block, bail_out_block);
+ }
} else {
Value* read_field_args[] =
{this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 15e046f..682ba04 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -176,8 +176,10 @@ class HdfsAvroScanner : public BaseSequenceScanner {
int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
Tuple* tuple, TupleRow* tuple_row);
- /// Materializes a single tuple from serialized record data.
- void MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
+ /// Materializes a single tuple from serialized record data. Will return false and set
+ /// error in parse_status_ if memory limit is exceeded when allocating new char buffer.
+ /// See comments below for ReadAvroChar().
+ bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
uint8_t** data, Tuple* tuple);
/// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
@@ -190,7 +192,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// schema.
/// TODO: Codegen a function for each unique file schema.
static llvm::Function* CodegenMaterializeTuple(HdfsScanNode* node,
- LlvmCodeGen* codegen);
+ LlvmCodeGen* codegen);
/// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
/// record.
@@ -200,21 +202,23 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// - builder: used to insert the IR, starting at the current insert point. The insert
/// point will be left at the end of the record but before the 'insert_before'
/// block.
- /// - insert_before: the block to insert any new blocks directly before. NULL if blocks
- /// should be inserted at the end of fn. (This could theoretically be inferred from
- /// builder's insert point, but I can't figure out how to get the successor to a
- /// basic block.)
+ /// - insert_before: the block to insert any new blocks directly before. This is either
+ /// the bail_out block or some basic blocks before that.
+ /// - bail_out: the block to jump to if anything fails. This is used in particular by
+ /// ReadAvroChar() which can exceed memory limit during allocation from MemPool.
/// - this_val, pool_val, tuple_val, data_val: arguments to MaterializeTuple()
static Status CodegenReadRecord(
const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
LlvmCodeGen* codegen, void* builder, llvm::Function* fn,
- llvm::BasicBlock* insert_before, llvm::Value* this_val, llvm::Value* pool_val,
- llvm::Value* tuple_val, llvm::Value* data_val);
+ llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val,
+ llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val);
/// Creates the IR for reading an Avro scalar at builder's current insert point.
static Status CodegenReadScalar(const AvroSchemaElement& element,
- SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* builder, llvm::Value* this_val,
- llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val);
+ SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
+ llvm::BasicBlock* end_field_block, llvm::BasicBlock* bail_out_block,
+ llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
+ llvm::Value* data_val);
/// The following are cross-compiled functions for parsing a serialized Avro primitive
/// type and writing it to a slot. They can also be used for skipping a field without
@@ -225,6 +229,10 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// - type: The type of the slot. (This is necessary because there is not a 1:1 mapping
/// between Avro types and Impala's primitive types.)
/// - pool: MemPool for string data.
+ ///
+ /// ReadAvroChar() will return false and set error in parse_status_ if memory limit
+ /// is exceeded when allocating the new char buffer. It returns true otherwise.
+ ///
void ReadAvroBoolean(
PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool);
void ReadAvroInt32(
@@ -238,11 +246,11 @@ class HdfsAvroScanner : public BaseSequenceScanner {
void ReadAvroVarchar(
PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot,
MemPool* pool);
- void ReadAvroChar(
+ bool ReadAvroChar(
PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot,
MemPool* pool);
- void ReadAvroString( PrimitiveType type, uint8_t** data, bool write_slot, void* slot,
- MemPool* pool);
+ void ReadAvroString(
+ PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool);
/// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or
/// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is passed
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ad8e360..a910243 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -176,7 +176,8 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
namespace impala {
-const string PARQUET_MEM_LIMIT_EXCEEDED = "$0 failed to allocate $1 bytes for $2.";
+const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
+ "$1 bytes for $2.";
HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state)
: HdfsScanner(scan_node, state),
@@ -750,7 +751,7 @@ bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot(
if (slot_desc()->type().IsVarLenStringType()) {
sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
if (UNLIKELY(sv.ptr == NULL)) {
- string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot()",
+ string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
len, "StringValue");
parent_->parse_status_ =
pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
@@ -1132,7 +1133,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
if (decompressor_.get() != NULL) {
dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
if (UNLIKELY(dict_values == NULL)) {
- string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+ string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
uncompressed_size, "dictionary");
return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
parent_->state_, details, uncompressed_size);
@@ -1147,7 +1148,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
// more data) to a new buffer
dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
if (UNLIKELY(dict_values == NULL)) {
- string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+ string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
data_size, "dictionary");
return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
parent_->state_, details, data_size);
@@ -1184,7 +1185,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
uint8_t* decompressed_buffer =
decompressed_data_pool_->TryAllocate(uncompressed_size);
if (UNLIKELY(decompressed_buffer == NULL)) {
- string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()",
+ string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
uncompressed_size, "decompressed data");
return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
parent_->state_, details, uncompressed_size);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 7c0fd9a..a26a575 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -115,8 +115,8 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
THdfsFileFormat::type type, const string& scanner_name) {
if (!scan_node_->tuple_desc()->string_slots().empty()
&& partition->escape_char() != '\0') {
- // Cannot use codegen if there are strings slots and we need to
- // compact (i.e. copy) the data.
+ // Codegen currently doesn't emit call to MemPool::TryAllocate() so skip codegen if
+ // there are strings slots and we need to compact (i.e. copy) the data.
scan_node_->IncNumScannersCodegenDisabled();
return Status::OK();
}
@@ -181,8 +181,8 @@ Status HdfsScanner::CommitRows(int num_rows) {
RETURN_IF_ERROR(StartNewRowBatch());
}
if (context_->cancelled()) return Status::CANCELLED;
- // TODO: Replace with GetQueryStatus().
- RETURN_IF_ERROR(state_->CheckQueryState());
+ // Check for UDF errors.
+ RETURN_IF_ERROR(state_->GetQueryStatus());
// Free local expr allocations for this thread
HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
for (; iter != scanner_conjuncts_map_.end(); ++iter) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 3bed4e3..168cdd1 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -252,6 +252,10 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
// Call jitted function if possible
int tuples_returned;
if (write_tuples_fn_ != NULL) {
+ // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots
+ // and escape characters. TextConverter::WriteSlot() will be used instead.
+ DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
+ delimited_text_parser_->escape_char() == '\0');
// last argument: seq always starts at record_location[0]
tuples_returned = write_tuples_fn_(this, pool, tuple_row,
batch_->row_byte_size(), &field_locations_[0], num_to_process,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 69abebc..9572260 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -35,6 +35,7 @@ using boost::algorithm::ends_with;
using boost::algorithm::to_lower;
using namespace impala;
using namespace llvm;
+using namespace strings;
const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
@@ -359,7 +360,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
// There can be one partial tuple which returned no more fields from this buffer.
DCHECK_LE(*num_tuples, num_fields + 1);
if (!boundary_column_.Empty()) {
- CopyBoundaryField(&field_locations_[0], pool);
+ RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
boundary_column_.Clear();
}
num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, *num_tuples);
@@ -379,14 +380,14 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
// Save contents that are split across buffers if we are going to return this column
if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) {
DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
- boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start);
+ RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start));
char* last_row = NULL;
if (*num_tuples == 0) {
last_row = batch_start_ptr_;
} else {
last_row = row_end_locations_[*num_tuples - 1] + 1;
}
- boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row);
+ RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row));
}
COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
@@ -718,6 +719,10 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
int tuples_returned = 0;
// Call jitted function if possible
if (write_tuples_fn_ != NULL) {
+ // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string
+ // slots and escape characters. TextConverter::WriteSlot() will be used instead.
+ DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
+ delimited_text_parser_->escape_char() == '\0');
tuples_returned = write_tuples_fn_(this, pool, tuple_row,
batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
scan_node_->materialized_slots().size(), num_tuples_processed);
@@ -751,15 +756,21 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
return num_tuples_materialized;
}
-void HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
+Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
bool needs_escape = data->len < 0;
int copy_len = needs_escape ? -data->len : data->len;
int total_len = copy_len + boundary_column_.Size();
- char* str_data = reinterpret_cast<char*>(pool->Allocate(total_len));
+ char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
+ if (UNLIKELY(str_data == NULL)) {
+ string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
+ "$0 bytes.", total_len);
+ return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);
+ }
memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
data->start = str_data;
data->len = needs_escape ? -total_len : total_len;
+ return Status::OK();
}
int HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 50758ae..804c9c3 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -116,11 +116,12 @@ class HdfsTextScanner : public HdfsScanner {
int64_t* decompressed_len, bool *eosr);
/// Prepends field data that was from the previous file buffer (This field straddled two
- /// file buffers). 'data' already contains the pointer/len from the current file buffer,
- /// boundary_column_ contains the beginning of the data from the previous file
- /// buffer. This function will allocate a new string from the tuple pool, concatenate the
- /// two pieces and update 'data' to contain the new pointer/len.
- void CopyBoundaryField(FieldLocation* data, MemPool* pool);
+ /// file buffers). 'data' already contains the pointer/len from the current file buffer,
+ /// boundary_column_ contains the beginning of the data from the previous file buffer.
+ /// This function will allocate a new string from the tuple pool, concatenate the
+ /// two pieces and update 'data' to contain the new pointer/len. Return error status if
+ /// memory limit is exceeded when allocating a new string.
+ Status CopyBoundaryField(FieldLocation* data, MemPool* pool);
/// Writes the intermediate parsed data into slots, outputting
/// tuples to row_batch as they complete.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 040362b..897f8c5 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -118,7 +118,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
}
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
- if (parent_->cancelled()) return Status::CANCELLED;
+ if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
// io_buffer_ should only be null the first time this is called
DCHECK(io_buffer_ != NULL ||
@@ -178,7 +178,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
*len = 0;
if (eosr()) return Status::OK();
- if (parent_->cancelled()) {
+ if (UNLIKELY(parent_->cancelled())) {
DCHECK(*out_buffer == NULL);
return Status::CANCELLED;
}
@@ -243,11 +243,11 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
// We need to fetch more bytes. Copy the end of the current buffer and fetch the next
// one.
- boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_);
+ RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
RETURN_IF_ERROR(GetNextBuffer());
- if (parent_->cancelled()) return Status::CANCELLED;
+ if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
if (io_buffer_bytes_left_ == 0) {
// No more bytes (i.e. EOF)
@@ -267,7 +267,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
} else {
- boundary_buffer_->Append(io_buffer_pos_, num_bytes);
+ RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
boundary_buffer_bytes_left_ += num_bytes;
boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
boundary_buffer_->Size() - boundary_buffer_bytes_left_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.h
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.h b/be/src/exec/text-converter.h
index 266e74d..3651146 100644
--- a/be/src/exec/text-converter.h
+++ b/be/src/exec/text-converter.h
@@ -49,7 +49,7 @@ class TextConverter {
/// and writes the result into the tuples's slot.
/// copy_string indicates whether we need to make a separate copy of the string data:
/// For regular unescaped strings, we point to the original data in the file_buf_.
- /// For regular escaped strings, we copy an its unescaped string into a separate buffer
+ /// For regular escaped strings, we copy its unescaped string into a separate buffer
/// and point to it.
/// If the string needs to be copied, the memory is allocated from 'pool', otherwise
/// 'pool' is unused.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h
index 7b429fa..d50b66f 100644
--- a/be/src/exec/text-converter.inline.h
+++ b/be/src/exec/text-converter.inline.h
@@ -33,7 +33,7 @@
namespace impala {
/// Note: this function has a codegen'd version. Changing this function requires
-/// corresponding changes to CodegenWriteSlot.
+/// corresponding changes to CodegenWriteSlot().
inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple,
const char* data, int len, bool copy_string, bool need_escape, MemPool* pool) {
if ((len == 0 && !slot_desc->type().IsStringType()) || data == NULL) {
@@ -59,7 +59,7 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR) buffer_len = type.len;
bool reuse_data = type.IsVarLenStringType() &&
- !(len != 0 && (copy_string || need_escape));
+ !(len != 0 && (copy_string || need_escape));
if (type.type == TYPE_CHAR) reuse_data &= (buffer_len <= len);
StringValue str;
@@ -67,9 +67,18 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
if (reuse_data) {
str.ptr = const_cast<char*>(data);
} else {
+ // The codegen version of this code (generated by CodegenWriteSlot()) doesn't
+ // include this path. In other words, 'reuse_data' will always be true in the
+ // codegen version:
+ // 1. CodegenWriteSlot() doesn't yet support slot of TYPE_CHAR
+ // 2. HdfsScanner::InitializeWriteTuplesFn() will not codegen if there is
+ // any escape character.
+ // 3. HdfsScanner::WriteCompleteTuple() always calls this function with
+ // 'copy_string' == false.
str.ptr = type.IsVarLenStringType() ?
- reinterpret_cast<char*>(pool->Allocate(buffer_len)) :
+ reinterpret_cast<char*>(pool->TryAllocate(buffer_len)) :
reinterpret_cast<char*>(slot);
+ if (UNLIKELY(str.ptr == NULL)) return false;
if (need_escape) {
UnescapeString(data, str.ptr, &str.len, buffer_len);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 1787b12..c8f79df 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -16,9 +16,12 @@
#ifndef IMPALA_RUNTIME_STRING_BUFFER_H
#define IMPALA_RUNTIME_STRING_BUFFER_H
+#include "common/status.h"
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
+using namespace strings;
+
namespace impala {
/// Dynamic-sizable string (similar to std::string) but without as many
@@ -30,7 +33,7 @@ namespace impala {
class StringBuffer {
public:
/// C'tor for StringBuffer. Memory backing the string will be allocated from
- /// the pool as necessary. Can optionally be initialized from a StringValue.
+ /// the pool as necessary. Can optionally be initialized from a StringValue.
StringBuffer(MemPool* pool, StringValue* str = NULL)
: pool_(pool), buffer_size_(0) {
DCHECK(pool_ != NULL);
@@ -41,24 +44,24 @@ class StringBuffer {
}
/// Append 'str' to the current string, allocating a new buffer as necessary.
- void Append(const char* str, int len) {
+ /// Return error status if memory limit is exceeded.
+ Status Append(const char* str, int len) {
int new_len = len + string_value_.len;
- if (new_len > buffer_size_) {
- GrowBuffer(new_len);
- }
+ if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len));
memcpy(string_value_.ptr + string_value_.len, str, len);
string_value_.len = new_len;
+ return Status::OK();
}
/// TODO: switch everything to uint8_t?
- void Append(const uint8_t* str, int len) {
- Append(reinterpret_cast<const char*>(str), len);
+ Status Append(const uint8_t* str, int len) {
+ return Append(reinterpret_cast<const char*>(str), len);
}
- /// Assigns contents to StringBuffer
- void Assign(const char* str, int len) {
+ /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded.
+ Status Assign(const char* str, int len) {
Clear();
- Append(str, len);
+ return Append(str, len);
}
/// Clear the underlying StringValue. The allocated buffer can be reused.
@@ -94,16 +97,23 @@ class StringBuffer {
private:
/// Grows the buffer backing the string to be at least new_size, copying over the
- /// previous string data into the new buffer.
- void GrowBuffer(int new_len) {
+ /// previous string data into the new buffer. Return error status if memory limit
+ /// is exceeded.
+ Status GrowBuffer(int new_len) {
// TODO: Release/reuse old buffers somehow
buffer_size_ = std::max(buffer_size_ * 2, new_len);
DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH);
- char* new_buffer = reinterpret_cast<char*>(pool_->Allocate(buffer_size_));
- if (string_value_.len > 0) {
+ char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_));
+ if (UNLIKELY(new_buffer == NULL)) {
+ string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.",
+ buffer_size_);
+ return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_);
+ }
+ if (LIKELY(string_value_.len > 0)) {
memcpy(new_buffer, string_value_.ptr, string_value_.len);
}
string_value_.ptr = new_buffer;
+ return Status::OK();
}
MemPool* pool_;