You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2020/02/26 23:55:32 UTC
[impala] 02/05: IMPALA-4080 [part 2]: Invoke codegen from scan's
plan node instead of exec node
This is an automated email from the ASF dual-hosted git repository.
tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit ad3177035103dfcb1625b32f04f22ea87456eb6c
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Feb 14 14:33:12 2020 -0800
IMPALA-4080 [part 2]: Invoke codegen from scan's plan node instead of
exec node
This patch moves the code responsible for invoking codegen for
hdfs scanner to the plan node.
Testing:
Successfully ran exhaustive tests.
Also verified manually that codegen code is generated and
used for each hdfs scanner.
Change-Id: I02c6183271b3731aa0d3cae2426be7269e462967
Reviewed-on: http://gerrit.cloudera.org:8080/15259
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/hdfs-avro-scanner-ir.cc | 2 +-
be/src/exec/hdfs-avro-scanner.cc | 51 ++++----
be/src/exec/hdfs-avro-scanner.h | 15 +--
be/src/exec/hdfs-orc-scanner.cc | 2 +-
be/src/exec/hdfs-rcfile-scanner.cc | 12 +-
be/src/exec/hdfs-rcfile-scanner.h | 2 -
be/src/exec/hdfs-scan-node-base.cc | 196 ++++++++++++++++------------
be/src/exec/hdfs-scan-node-base.h | 85 ++++++++----
be/src/exec/hdfs-scanner.cc | 45 +++----
be/src/exec/hdfs-scanner.h | 12 +-
be/src/exec/hdfs-sequence-scanner.cc | 16 +--
be/src/exec/hdfs-sequence-scanner.h | 6 +-
be/src/exec/hdfs-text-scanner.cc | 16 +--
be/src/exec/hdfs-text-scanner.h | 5 +-
be/src/exec/parquet/hdfs-parquet-scanner.cc | 12 +-
be/src/exec/parquet/hdfs-parquet-scanner.h | 6 +-
16 files changed, 257 insertions(+), 226 deletions(-)
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index abfc984..1d8ec60 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -47,7 +47,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat
tuple))) {
return 0;
}
- tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
+ tuple_row->SetTuple(0, tuple);
if (EvalConjuncts(tuple_row)) {
if (copy_strings) {
if (UNLIKELY(!tuple->CopyStrings("HdfsAvroScanner::DecodeAvroData()",
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 8c82675..20fee8e 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -78,13 +78,12 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
return Status::OK();
}
-Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
- const vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn) {
+Status HdfsAvroScanner::Codegen(HdfsScanPlanNode* node,
+ RuntimeState* state, llvm::Function** decode_avro_data_fn) {
*decode_avro_data_fn = nullptr;
- DCHECK(node->runtime_state()->ShouldCodegen());
- LlvmCodeGen* codegen = node->runtime_state()->codegen();
- DCHECK(codegen != nullptr);
- RETURN_IF_ERROR(CodegenDecodeAvroData(node, codegen, conjuncts, decode_avro_data_fn));
+ DCHECK(state->ShouldCodegen());
+ DCHECK(state->codegen() != nullptr);
+ RETURN_IF_ERROR(CodegenDecodeAvroData(node, state, decode_avro_data_fn));
DCHECK(*decode_avro_data_fn != nullptr);
return Status::OK();
}
@@ -783,14 +782,14 @@ void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_
// bail_out: ; preds = %entry
// ret i1 false
// }
-Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
+Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanPlanNode* node,
LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) {
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HdfsAvroScanner>();
- TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
+ TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc_);
llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
if (tuple_type == nullptr) return Status("Could not generate tuple struct.");
llvm::Type* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
@@ -803,8 +802,8 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
// Schema can be null if metadata is stale. See test in
// queries/QueryTest/avro-schema-changes.test.
- RETURN_IF_ERROR(CheckSchema(node->avro_schema()));
- int num_children = node->avro_schema().children.size();
+ RETURN_IF_ERROR(CheckSchema(*node->avro_schema_.get()));
+ int num_children = node->avro_schema_->children.size();
if (num_children == 0) {
return Status("Invalid Avro record schema: contains no children.");
}
@@ -845,7 +844,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
llvm::BasicBlock::Create(context, "bail_out", helper_fn, nullptr);
Status status = CodegenReadRecord(
- SchemaPath(), node->avro_schema(), i, std::min(num_children, i + step_size),
+ SchemaPath(), *node->avro_schema_.get(), i, std::min(num_children, i + step_size),
node, codegen, &builder, helper_fn, bail_out_block,
bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val);
if (!status.ok()) {
@@ -912,7 +911,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
const AvroSchemaElement& record, int child_start, int child_end,
- const HdfsScanNodeBase* node, LlvmCodeGen* codegen, void* void_builder,
+ const HdfsScanPlanNode* node, LlvmCodeGen* codegen, void* void_builder,
llvm::Function* fn, llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out,
llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
llvm::Value* data_val, llvm::Value* data_end_val) {
@@ -927,16 +926,16 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
// Used to store result of ReadUnionType() call
llvm::Value* is_null_ptr = nullptr;
for (int i = child_start; i < child_end; ++i) {
- const AvroSchemaElement* field = &record.children[i];
+ const AvroSchemaElement& field = record.children[i];
int col_idx = i;
// If we're about to process the table-level columns, account for the partition keys
// when constructing 'path'
- if (path.empty()) col_idx += node->num_partition_keys();
+ if (path.empty()) col_idx += node->hdfs_table_->num_clustering_cols();
SchemaPath new_path = path;
new_path.push_back(col_idx);
int slot_idx = node->GetMaterializedSlotIdx(new_path);
SlotDescriptor* slot_desc = (slot_idx == HdfsScanNodeBase::SKIP_COLUMN) ?
- nullptr : node->materialized_slots()[slot_idx];
+ nullptr : node->materialized_slots_[slot_idx];
// Block that calls appropriate Read<Type> function
llvm::BasicBlock* read_field_block =
@@ -951,12 +950,12 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
llvm::BasicBlock* end_field_block =
llvm::BasicBlock::Create(context, "end_field", fn, insert_before);
- if (field->nullable()) {
+ if (field.nullable()) {
// Field could be null. Create conditional branch based on ReadUnionType result.
llvm::Function* read_union_fn =
codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
llvm::Value* null_union_pos_val =
- codegen->GetI32Constant(field->null_union_position);
+ codegen->GetI32Constant(field.null_union_position);
if (is_null_ptr == nullptr) {
is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->bool_type(),
"is_null_ptr");
@@ -992,15 +991,15 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
// Write read_field_block IR
builder->SetInsertPoint(read_field_block);
llvm::Value* ret_val = nullptr;
- if (field->schema->type == AVRO_RECORD) {
+ if (field.schema->type == AVRO_RECORD) {
llvm::BasicBlock* insert_before_block =
(null_block != nullptr) ? null_block : end_field_block;
- RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, 0, field->children.size(),
+ RETURN_IF_ERROR(CodegenReadRecord(new_path, field, 0, field.children.size(),
node, codegen, builder, fn,
insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val,
data_end_val));
} else {
- RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
+ RETURN_IF_ERROR(CodegenReadScalar(field, slot_desc, codegen, builder,
this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val));
}
builder->CreateCondBr(ret_val, end_field_block, bail_out);
@@ -1098,9 +1097,11 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
return Status::OK();
}
-Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
- LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
- llvm::Function** decode_avro_data_fn) {
+Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanPlanNode* node,
+ RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+ const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
+ LlvmCodeGen* codegen = state->codegen();
+
llvm::Function* materialize_tuple_fn;
RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
DCHECK(materialize_tuple_fn != nullptr);
@@ -1123,11 +1124,11 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
llvm::Function* copy_strings_fn;
RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
- codegen, *node->tuple_desc(), ©_strings_fn));
+ codegen, *node->tuple_desc_, ©_strings_fn));
replaced = codegen->ReplaceCallSites(fn, copy_strings_fn, "CopyStrings");
DCHECK_REPLACE_COUNT(replaced, 1);
- int tuple_byte_size = node->tuple_desc()->byte_size();
+ int tuple_byte_size = node->tuple_desc_->byte_size();
replaced = codegen->ReplaceCallSitesWithValue(fn,
codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index f88adcf..b3bd355 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -95,10 +95,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
/// codegen was successful or nullptr otherwise.
- static Status Codegen(HdfsScanNodeBase* node,
- const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** decode_avro_data_fn)
- WARN_UNUSED_RESULT;
+ static Status Codegen(
+ HdfsScanPlanNode* node, RuntimeState* state, llvm::Function** decode_avro_data_fn);
static const char* LLVM_CLASS_NAME;
@@ -210,15 +208,14 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
/// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
/// successful or returns an error.
- static Status CodegenDecodeAvroData(const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
- const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** decode_avro_data_fn) WARN_UNUSED_RESULT;
+ static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** decode_avro_data_fn);
/// Codegens a version of MaterializeTuple() that reads records based on the table
/// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was
/// successful or returns an error.
/// TODO: Codegen a function for each unique file schema.
- static Status CodegenMaterializeTuple(const HdfsScanNodeBase* node,
+ static Status CodegenMaterializeTuple(const HdfsScanPlanNode* node,
LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) WARN_UNUSED_RESULT;
/// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
@@ -238,7 +235,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// - child_start / child_end: specifies to only generate a subset of the record
/// schema's children
static Status CodegenReadRecord(const SchemaPath& path, const AvroSchemaElement& record,
- int child_start, int child_end, const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
+ int child_start, int child_end, const HdfsScanPlanNode* node, LlvmCodeGen* codegen,
void* builder, llvm::Function* fn, llvm::BasicBlock* insert_before,
llvm::BasicBlock* bail_out, llvm::Value* this_val, llvm::Value* pool_val,
llvm::Value* tuple_val, llvm::Value* data_val,
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index b6b23c7..a38b345 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -687,7 +687,7 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
InitTuple(tuple_desc, template_tuple_, tuple);
RETURN_IF_ERROR(coll_reader->TransferTuple(tuple, dst_batch->tuple_data_pool()));
- row->SetTuple(scan_node_->tuple_idx(), tuple);
+ row->SetTuple(0, tuple);
if (!EvalRuntimeFilters(row)) continue;
if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, row)) {
row = next_row(row);
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index bf3d99c..dba8f51 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -602,7 +602,7 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
RETURN_IF_ERROR(state_->LogOrReturnError(msg));
}
- current_row->SetTuple(scan_node_->tuple_idx(), tuple);
+ current_row->SetTuple(0, tuple);
// Evaluate the conjuncts and add the row to the batch
if (EvalConjuncts(current_row)) {
++num_to_commit;
@@ -638,13 +638,3 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
}
return Status::OK();
}
-
-void HdfsRCFileScanner::DebugString(int indentation_level, stringstream* out) const {
- // TODO: Add more details of internal state.
- *out << string(indentation_level * 2, ' ')
- << "HdfsRCFileScanner(tupleid=" << scan_node_->tuple_idx()
- << " file=" << stream_->filename();
- // TODO: Scanner::DebugString
- // ExecNode::DebugString(indentation_level, out);
- *out << "])" << endl;
-}
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index f35bcf8..a1410a0 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -238,8 +238,6 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
- void DebugString(int indentation_level, std::stringstream* out) const;
-
private:
/// The key class name located in the RCFile Header.
/// This is always "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 49ba920..faaf55c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -43,6 +43,7 @@
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
+#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "util/disk-info.h"
@@ -151,14 +152,59 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanPlanNode::Init(tnode, state));
+ tuple_id_ = tnode.hdfs_scan_node.tuple_id;
+ tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+ DCHECK(tuple_desc_->table_desc() != NULL);
+ hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
+
+ // Parse Avro table schema if applicable
+ const string& avro_schema_str = hdfs_table_->avro_schema();
+ if (!avro_schema_str.empty()) {
+ avro_schema_t avro_schema;
+ int error = avro_schema_from_json_length(
+ avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
+ if (error != 0) {
+ return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
+ }
+ RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
+ }
+
+ // Gather materialized partition-key slots and non-partition slots.
+ const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
+ for (size_t i = 0; i < slots.size(); ++i) {
+ if (hdfs_table_->IsClusteringCol(slots[i])) {
+ partition_key_slots_.push_back(slots[i]);
+ } else {
+ materialized_slots_.push_back(slots[i]);
+ }
+ }
+
+ // Order the materialized slots such that for schemaless file formats (e.g. text) the
+ // order corresponds to the physical order in files. For formats where the file schema
+ // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
+ sort(materialized_slots_.begin(), materialized_slots_.end(),
+ SlotDescriptor::ColPathLessThan);
+
+ // Populate mapping from slot path to index into materialized_slots_.
+ for (int i = 0; i < materialized_slots_.size(); ++i) {
+ path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
+ }
+
+ // Initialize is_materialized_col_
+ is_materialized_col_.resize(hdfs_table_->num_cols());
+ for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
+ is_materialized_col_[i] =
+ GetMaterializedSlotIdx(vector<int>(1, i)) != HdfsScanNodeBase::SKIP_COLUMN;
+ }
+
// Add collection item conjuncts
- for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
+ for (const auto& entry : tnode.hdfs_scan_node.collection_conjuncts) {
TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first);
- RowDescriptor* collection_row_desc = state->obj_pool()->Add(
- new RowDescriptor(tuple_desc, /* is_nullable */ false));
+ RowDescriptor* collection_row_desc =
+ state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false));
DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end());
- RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state,
- &conjuncts_map_[entry.first]));
+ RETURN_IF_ERROR(ScalarExpr::Create(
+ entry.second, *collection_row_desc, state, &conjuncts_map_[entry.first]));
}
const TTupleId& tuple_id = tnode.hdfs_scan_node.tuple_id;
DCHECK(conjuncts_map_[tuple_id].empty());
@@ -174,9 +220,29 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
*min_max_row_desc, state, &min_max_conjuncts_));
}
+
+ // TODO: Find formats to be read across all instances once codegen done per fragment.
+ const TPlanFragmentInstanceCtx& instance_ctx = state->instance_ctx();
+ auto ranges = instance_ctx.per_node_scan_ranges.find(tnode.node_id);
+ if (ranges != instance_ctx.per_node_scan_ranges.end()) {
+ for (const TScanRangeParams& scan_range_param : ranges->second) {
+ const THdfsFileSplit& split = scan_range_param.scan_range.hdfs_file_split;
+ HdfsPartitionDescriptor* partition_desc =
+ hdfs_table_->GetPartition(split.partition_id);
+ scanned_file_formats_.insert(partition_desc->file_format());
+ }
+ }
return Status::OK();
}
+int HdfsScanPlanNode::GetMaterializedSlotIdx(const std::vector<int>& path) const {
+ auto result = path_to_materialized_slot_idx_.find(path);
+ if (result == path_to_materialized_slot_idx_.end()) {
+ return HdfsScanNodeBase::SKIP_COLUMN;
+ }
+ return result->second;
+}
+
Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(tnode_->hdfs_scan_node.use_mt_scan_node ?
@@ -192,30 +258,33 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno
: ScanNode(pool, pnode, descs),
min_max_tuple_id_(
hdfs_scan_node.__isset.min_max_tuple_id ? hdfs_scan_node.min_max_tuple_id : -1),
+ min_max_conjuncts_(pnode.min_max_conjuncts_),
min_max_tuple_desc_(
min_max_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(min_max_tuple_id_)),
skip_header_line_count_(hdfs_scan_node.__isset.skip_header_line_count ?
hdfs_scan_node.skip_header_line_count :
0),
- tuple_id_(hdfs_scan_node.tuple_id),
+ tuple_id_(pnode.tuple_id_),
parquet_count_star_slot_offset_(
hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
hdfs_scan_node.parquet_count_star_slot_offset :
-1),
- tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
+ tuple_desc_(pnode.tuple_desc_),
+ hdfs_table_(pnode.hdfs_table_),
+ avro_schema_(*pnode.avro_schema_.get()),
+ conjuncts_map_(pnode.conjuncts_map_),
thrift_dict_filter_conjuncts_map_(hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
&hdfs_scan_node.dictionary_filter_conjuncts :
nullptr),
+ codegend_fn_map_(pnode.codegend_fn_map_),
+ is_materialized_col_(pnode.is_materialized_col_),
+ materialized_slots_(pnode.materialized_slots_),
+ partition_key_slots_(pnode.partition_key_slots_),
disks_accessed_bitmap_(TUnit::UNIT, 0),
- active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
- conjuncts_map_ = pnode.conjuncts_map_;
- min_max_conjuncts_ = pnode.min_max_conjuncts_;
-}
+ active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
-HdfsScanNodeBase::~HdfsScanNodeBase() {
-}
+HdfsScanNodeBase::~HdfsScanNodeBase() {}
-/// TODO: Break up this very long function.
Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ScanNode::Prepare(state));
@@ -241,49 +310,8 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
}
// One-time initialization of state that is constant across scan ranges
- DCHECK(tuple_desc_->table_desc() != NULL);
- hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
scan_node_pool_.reset(new MemPool(mem_tracker()));
- // Parse Avro table schema if applicable
- const string& avro_schema_str = hdfs_table_->avro_schema();
- if (!avro_schema_str.empty()) {
- avro_schema_t avro_schema;
- int error = avro_schema_from_json_length(
- avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
- if (error != 0) {
- return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
- }
- RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
- }
-
- // Gather materialized partition-key slots and non-partition slots.
- const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
- for (size_t i = 0; i < slots.size(); ++i) {
- if (hdfs_table_->IsClusteringCol(slots[i])) {
- partition_key_slots_.push_back(slots[i]);
- } else {
- materialized_slots_.push_back(slots[i]);
- }
- }
-
- // Order the materialized slots such that for schemaless file formats (e.g. text) the
- // order corresponds to the physical order in files. For formats where the file schema
- // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
- sort(materialized_slots_.begin(), materialized_slots_.end(),
- SlotDescriptor::ColPathLessThan);
-
- // Populate mapping from slot path to index into materialized_slots_.
- for (int i = 0; i < materialized_slots_.size(); ++i) {
- path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
- }
-
- // Initialize is_materialized_col_
- is_materialized_col_.resize(hdfs_table_->num_cols());
- for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
- is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
- }
-
HdfsFsCache::HdfsFsMap fs_cache;
// Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
// partition_ids_, file_descs_, and per_type_files_.
@@ -374,37 +402,28 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
+ const HdfsScanPlanNode& const_plan_node =
+ static_cast<const HdfsScanPlanNode&>(plan_node_);
+ HdfsScanPlanNode& hdfs_plan_node = const_cast<HdfsScanPlanNode&>(const_plan_node);
+ hdfs_plan_node.Codegen(state, state->runtime_profile());
+}
- // Create codegen'd functions
- for (int format = THdfsFileFormat::TEXT; format <= THdfsFileFormat::PARQUET; ++format) {
- vector<HdfsFileDesc*>& file_descs =
- per_type_files_[static_cast<THdfsFileFormat::type>(format)];
-
- if (file_descs.empty()) continue;
-
- // Randomize the order this node processes the files. We want to do this to avoid
- // issuing remote reads to the same DN from different impalads. In file formats such
- // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
- // If every node goes through the files in the same order, all the remote reads are
- // for the same file meaning a few DN serves a lot of remote reads at the same time.
- random_shuffle(file_descs.begin(), file_descs.end());
-
- // Create reusable codegen'd functions for each file type type needed
- // TODO: do this for conjuncts_map_
+void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+ for (THdfsFileFormat::type format: scanned_file_formats_) {
llvm::Function* fn;
Status status;
switch (format) {
case THdfsFileFormat::TEXT:
- status = HdfsTextScanner::Codegen(this, conjuncts_, &fn);
+ status = HdfsTextScanner::Codegen(this, state, &fn);
break;
case THdfsFileFormat::SEQUENCE_FILE:
- status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn);
+ status = HdfsSequenceScanner::Codegen(this, state, &fn);
break;
case THdfsFileFormat::AVRO:
- status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn);
+ status = HdfsAvroScanner::Codegen(this, state, &fn);
break;
case THdfsFileFormat::PARQUET:
- status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn);
+ status = HdfsParquetScanner::Codegen(this, state, &fn);
break;
default:
// No codegen for this format
@@ -416,10 +435,10 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
if (status.ok()) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != NULL);
- codegen->AddFunctionToJit(fn,
- &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
+ codegen->AddFunctionToJit(
+ fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
}
- runtime_profile()->AddCodegenMsg(status.ok(), status, format_name);
+ profile->AddCodegenMsg(status.ok(), status, format_name);
}
}
@@ -588,6 +607,12 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
SkipFile(v.first, file);
}
}
+ // Randomize the order this node processes the files. We want to do this to avoid
+ // issuing remote reads to the same DN from different impalads. In file formats such
+ // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
+ // If every node goes through the files in the same order, all the remote reads are
+ // for the same file meaning a few DN serves a lot of remote reads at the same time.
+ random_shuffle(matching_files->begin(), matching_files->end());
}
// Issue initial ranges for all file types. Only call functions for file types that
@@ -770,7 +795,7 @@ void* HdfsScanNodeBase::GetFileMetadata(
}
void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
- CodegendFnMap::iterator it = codegend_fn_map_.find(type);
+ auto it = codegend_fn_map_.find(type);
if (it == codegend_fn_map_.end()) return NULL;
return it->second;
}
@@ -857,10 +882,10 @@ void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_des
void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
const TupleDescriptor& tuple_desc =
- *row_batch->row_desc()->tuple_descriptors()[tuple_idx()];
+ *row_batch->row_desc()->tuple_descriptors()[0];
if (tuple_desc.collection_slots().empty()) return;
for (int i = 0; i < row_batch->num_rows(); ++i) {
- Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
+ Tuple* tuple = row_batch->GetRow(i)->GetTuple(0);
DCHECK(tuple != NULL);
InitNullCollectionValues(&tuple_desc, tuple);
}
@@ -916,12 +941,11 @@ void HdfsScanNodeBase::SkipFile(const THdfsFileFormat::type& file_type,
}
}
-void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
- const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts();
+void HdfsScanPlanNode::ComputeSlotMaterializationOrder(
+ const DescriptorTbl& desc_tbl, vector<int>* order) const {
+ const vector<ScalarExpr*>& conjuncts = conjuncts_;
// Initialize all order to be conjuncts.size() (after the last conjunct)
- order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
-
- const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
+ order->insert(order->begin(), materialized_slots_.size(), conjuncts.size());
vector<SlotId> slot_ids;
for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 6b5a7a5..edd8cc3 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -103,6 +103,20 @@ class HdfsScanPlanNode : public ScanPlanNode {
public:
virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+ void Codegen(RuntimeState* state, RuntimeProfile* profile);
+
+ /// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
+ /// that path is not materialized. Only valid to call after Init().
+ int GetMaterializedSlotIdx(const std::vector<int>& path) const;
+
+ /// Utility function to compute the order in which to materialize slots to allow for
+ /// computing conjuncts as slots get materialized (on partial tuples).
+ /// 'order' will contain for each slot, the first conjunct it is associated with.
+ /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
+ /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have
+ /// order set to conjuncts.size(). Only valid to call after Init().
+ void ComputeSlotMaterializationOrder(
+ const DescriptorTbl& desc_tbl, std::vector<int>* order) const;
/// Conjuncts for each materialized tuple (top-level row batch tuples and collection
/// item tuples). Includes a copy of PlanNode.conjuncts_.
@@ -111,6 +125,42 @@ class HdfsScanPlanNode : public ScanPlanNode {
/// Conjuncts to evaluate on parquet::Statistics.
std::vector<ScalarExpr*> min_max_conjuncts_;
+
+ /// Tuple id resolved in Init() to set tuple_desc_ .
+ int tuple_id_ = -1;
+
+ /// Descriptor for tuples this scan node constructs
+ const TupleDescriptor* tuple_desc_ = nullptr;
+
+ /// Maps from a slot's path to its index into materialized_slots_.
+ boost::unordered_map<std::vector<int>, int> path_to_materialized_slot_idx_;
+
+ /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
+ /// for 0 <= i < total # columns in table
+ //
+ /// This should be a vector<bool>, but bool vectors are special-cased and not stored
+ /// internally as arrays, so instead we store as chars and cast to bools as needed
+ std::vector<char> is_materialized_col_;
+
+ /// Vector containing slot descriptors for all non-partition key slots. These
+ /// descriptors are sorted in order of increasing col_pos.
+ std::vector<SlotDescriptor*> materialized_slots_;
+
+ /// Vector containing slot descriptors for all partition key slots.
+ std::vector<SlotDescriptor*> partition_key_slots_;
+
+ /// Descriptor for the hdfs table, including partition and format metadata.
+ /// Set in Init, owned by QueryState
+ const HdfsTableDescriptor* hdfs_table_ = nullptr;
+
+ /// The root of the table's Avro schema, if we're scanning an Avro table.
+ ScopedAvroSchemaElement avro_schema_;
+
+ /// File formats that instances of this node will read.
+ boost::unordered_set<THdfsFileFormat::type> scanned_file_formats_;
+
+ /// Per scanner type codegen'd fn.
+ boost::unordered_map<THdfsFileFormat::type, void*> codegend_fn_map_;
};
/// Base class for all Hdfs scan nodes. Contains common members and functions
@@ -190,10 +240,6 @@ class HdfsScanNodeBase : public ScanNode {
const std::vector<SlotDescriptor*>& materialized_slots()
const { return materialized_slots_; }
- /// Returns the tuple idx into the row for this scan node to output to.
- /// Currently this is always 0.
- int tuple_idx() const { return 0; }
-
/// Returns number of partition keys in the table.
int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
@@ -209,7 +255,7 @@ class HdfsScanNodeBase : public ScanNode {
const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
- const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
+ const AvroSchemaElement& avro_schema() const { return avro_schema_; }
int skip_header_line_count() const { return skip_header_line_count_; }
io::RequestContext* reader_context() const { return reader_context_.get(); }
bool optimize_parquet_count_star() const {
@@ -239,9 +285,7 @@ class HdfsScanNodeBase : public ScanNode {
/// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
/// that path is not materialized.
int GetMaterializedSlotIdx(const std::vector<int>& path) const {
- PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
- if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
- return result->second;
+ return static_cast<const HdfsScanPlanNode&>(plan_node_).GetMaterializedSlotIdx(path);
}
/// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
@@ -356,14 +400,6 @@ class HdfsScanNodeBase : public ScanNode {
/// Calls RangeComplete() with skipped=true for all the splits of the file
void SkipFile(const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
- /// Utility function to compute the order in which to materialize slots to allow for
- /// computing conjuncts as slots get materialized (on partial tuples).
- /// 'order' will contain for each slot, the first conjunct it is associated with.
- /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
- /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have
- /// order set to conjuncts.size()
- void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
-
/// Returns true if there are no materialized slots, such as a count(*) over the table.
inline bool IsZeroSlotTableScan() const {
return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
@@ -430,7 +466,7 @@ class HdfsScanNodeBase : public ScanNode {
// to values > 0 for hdfs text files.
const int skip_header_line_count_;
- /// Tuple id resolved in Prepare() to set tuple_desc_
+ /// Tuple id of the tuple descriptor to be used.
const int tuple_id_;
/// The byte offset of the slot for Parquet metadata if Parquet count star optimization
@@ -455,7 +491,7 @@ class HdfsScanNodeBase : public ScanNode {
const HdfsTableDescriptor* hdfs_table_ = nullptr;
/// The root of the table's Avro schema, if we're scanning an Avro table.
- ScopedAvroSchemaElement avro_schema_;
+ const AvroSchemaElement& avro_schema_;
/// Partitions scanned by this scan node.
std::unordered_set<int64_t> partition_ids_;
@@ -503,26 +539,21 @@ class HdfsScanNodeBase : public ScanNode {
AtomicInt32 remaining_scan_range_submissions_ = { 1 };
/// Per scanner type codegen'd fn.
- typedef boost::unordered_map<THdfsFileFormat::type, void*> CodegendFnMap;
- CodegendFnMap codegend_fn_map_;
-
- /// Maps from a slot's path to its index into materialized_slots_.
- typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
- PathToSlotIdxMap path_to_materialized_slot_idx_;
+ const boost::unordered_map<THdfsFileFormat::type, void*>& codegend_fn_map_;
/// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
/// for 0 <= i < total # columns in table
//
/// This should be a vector<bool>, but bool vectors are special-cased and not stored
/// internally as arrays, so instead we store as chars and cast to bools as needed
- std::vector<char> is_materialized_col_;
+ const std::vector<char>& is_materialized_col_;
/// Vector containing slot descriptors for all non-partition key slots. These
/// descriptors are sorted in order of increasing col_pos.
- std::vector<SlotDescriptor*> materialized_slots_;
+ const std::vector<SlotDescriptor*>& materialized_slots_;
/// Vector containing slot descriptors for all partition key slots.
- std::vector<SlotDescriptor*> partition_key_slots_;
+ const std::vector<SlotDescriptor*>& partition_key_slots_;
/// Keeps track of total splits and the number finished.
ProgressUpdater progress_;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 345a140..ee6eb5b 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -214,7 +214,6 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
DCHECK_GE(num_tuples, 0);
- DCHECK_EQ(scan_node_->tuple_idx(), 0);
DCHECK_EQ(scan_node_->materialized_slots().size(), 0);
int num_to_commit = 0;
if (LIKELY(conjunct_evals_->size() == 0)) {
@@ -260,7 +259,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
*error_in_row |= error;
}
- tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
+ tuple_row->SetTuple(0, tuple);
return EvalConjuncts(tuple_row);
}
@@ -321,21 +320,21 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
// eval_fail: ; preds = %parse
// ret i1 false
// }
-Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
- LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_complete_tuple_fn) {
+Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
+ RuntimeState* state, llvm::Function** write_complete_tuple_fn) {
+ const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
+ LlvmCodeGen* codegen = state->codegen();
*write_complete_tuple_fn = NULL;
- RuntimeState* state = node->runtime_state();
// Cast away const-ness. The codegen only sets the cached typed llvm struct.
- TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
+ TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc_);
vector<llvm::Function*> slot_fns;
- for (int i = 0; i < node->materialized_slots().size(); ++i) {
+ for (int i = 0; i < node->materialized_slots_.size(); ++i) {
llvm::Function* fn = nullptr;
- SlotDescriptor* slot_desc = node->materialized_slots()[i];
+ SlotDescriptor* slot_desc = node->materialized_slots_[i];
RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
- node->hdfs_table()->null_column_value().data(),
- node->hdfs_table()->null_column_value().size(), true, state->strict_mode()));
+ node->hdfs_table_->null_column_value().data(),
+ node->hdfs_table_->null_column_value().size(), true, state->strict_mode()));
if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
slot_fns.push_back(fn);
}
@@ -343,7 +342,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
// Compute order to materialize slots. BE assumes that conjuncts should
// be evaluated in the order specified (optimization is already done by FE)
vector<int> materialize_order;
- node->ComputeSlotMaterializationOrder(&materialize_order);
+ node->ComputeSlotMaterializationOrder(state->desc_tbl(), &materialize_order);
// Get types to construct matching function signature to WriteCompleteTuple
llvm::PointerType* uint8_ptr_type = codegen->i8_ptr_type();
@@ -401,7 +400,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
// Put tuple in tuple_row
llvm::Value* tuple_row_typed =
builder.CreateBitCast(tuple_row_arg, llvm::PointerType::get(tuple_ptr_type, 0));
- llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(node->tuple_idx())};
+ llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(0)};
llvm::Value* tuple_in_row_addr =
builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs);
builder.CreateStore(tuple_arg, tuple_in_row_addr);
@@ -493,7 +492,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
return status;
}
- if (node->materialized_slots().size() + conjunct_idx
+ if (node->materialized_slots_.size() + conjunct_idx
>= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
codegen->SetNoInline(conjunct_fn);
}
@@ -516,7 +515,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
builder.SetInsertPoint(eval_fail_block);
builder.CreateRet(codegen->false_value());
- if (node->materialized_slots().size() + conjuncts.size()
+ if (node->materialized_slots_.size() + conjuncts.size()
> LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
codegen->SetNoInline(fn);
}
@@ -527,9 +526,10 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
return Status::OK();
}
-Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
- LlvmCodeGen* codegen, llvm::Function* write_complete_tuple_fn,
+Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanPlanNode* node,
+ RuntimeState* state, llvm::Function* write_complete_tuple_fn,
llvm::Function** write_aligned_tuples_fn) {
+ LlvmCodeGen* codegen = state->codegen();
*write_aligned_tuples_fn = NULL;
DCHECK(write_complete_tuple_fn != NULL);
@@ -543,12 +543,12 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
llvm::Function* copy_strings_fn;
RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
- codegen, *node->tuple_desc(), ©_strings_fn));
+ codegen, *node->tuple_desc_, ©_strings_fn));
replaced = codegen->ReplaceCallSites(
write_tuples_fn, copy_strings_fn, "CopyStrings");
DCHECK_REPLACE_COUNT(replaced, 1);
- int tuple_byte_size = node->tuple_desc()->byte_size();
+ int tuple_byte_size = node->tuple_desc_->byte_size();
replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn,
codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
@@ -561,16 +561,17 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
}
Status HdfsScanner::CodegenInitTuple(
- const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn) {
+ const HdfsScanPlanNode* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn) {
*init_tuple_fn = codegen->GetFunction(IRFunction::HDFS_SCANNER_INIT_TUPLE, true);
DCHECK(*init_tuple_fn != nullptr);
// Replace all of the constants in InitTuple() to specialize the code.
+ bool materialized_partition_keys_exist = !node->partition_key_slots_.empty();
int replaced = codegen->ReplaceCallSitesWithBoolConst(
- *init_tuple_fn, node->num_materialized_partition_keys() > 0, "has_template_tuple");
+ *init_tuple_fn, materialized_partition_keys_exist, "has_template_tuple");
DCHECK_REPLACE_COUNT(replaced, 1);
- const TupleDescriptor* tuple_desc = node->tuple_desc();
+ const TupleDescriptor* tuple_desc = node->tuple_desc_;
replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
codegen->GetI32Constant(tuple_desc->byte_size()), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 3e19658..bc7d66e 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -482,23 +482,21 @@ class HdfsScanner {
/// Codegen function to replace WriteCompleteTuple. Should behave identically
/// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
/// if codegen was successful or NULL otherwise.
- static Status CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
- LlvmCodeGen* codegen, const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_complete_tuple_fn) WARN_UNUSED_RESULT;
+ static Status CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
+ RuntimeState* state, llvm::Function** write_complete_tuple_fn);
/// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross
/// compiled to IR. This function loads the precompiled IR function, modifies it,
/// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
/// successful or NULL otherwise.
- static Status CodegenWriteAlignedTuples(const HdfsScanNodeBase*, LlvmCodeGen*,
- llvm::Function* write_tuple_fn,
- llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
+ static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, RuntimeState*,
+ llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
/// Codegen function to replace InitTuple() removing runtime constants like the tuple
/// size and branches like the template tuple existence check. The codegen'd version
/// of InitTuple() is stored in 'init_tuple_fn' if codegen was successful.
static Status CodegenInitTuple(
- const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn);
+ const HdfsScanPlanNode* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn);
/// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted version
/// and emitting a customized version of EvalRuntimeFilter() for each filter in
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 610325b..2cc08a0 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -48,18 +48,16 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
}
// Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
- const vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn) {
+Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = nullptr;
- DCHECK(node->runtime_state()->ShouldCodegen());
- LlvmCodeGen* codegen = node->runtime_state()->codegen();
- DCHECK(codegen != nullptr);
+ DCHECK(state->ShouldCodegen());
+ DCHECK(state->codegen() != nullptr);
llvm::Function* write_complete_tuple_fn;
- RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
- &write_complete_tuple_fn));
+ RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, state, &write_complete_tuple_fn));
DCHECK(write_complete_tuple_fn != nullptr);
- RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
- write_aligned_tuples_fn));
+ RETURN_IF_ERROR(CodegenWriteAlignedTuples(
+ node, state, write_complete_tuple_fn, write_aligned_tuples_fn));
DCHECK(*write_aligned_tuples_fn != nullptr);
return Status::OK();
}
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index e84a5e7..0b79b78 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -171,10 +171,8 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
- static Status Codegen(HdfsScanNodeBase* node,
- const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_aligned_tuples_fn)
- WARN_UNUSED_RESULT;
+ static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** write_aligned_tuples_fn);
protected:
/// Implementation of sequence container super class methods.
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 03ac83e..acd7f54 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -762,17 +762,15 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
// Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is
// handcrafted using the IRBuilder for the specific tuple description. This function
// is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
- const vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn) {
+Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = nullptr;
- DCHECK(node->runtime_state()->ShouldCodegen());
- LlvmCodeGen* codegen = node->runtime_state()->codegen();
- DCHECK(codegen != nullptr);
+ DCHECK(state->ShouldCodegen());
+ DCHECK(state->codegen() != nullptr);
llvm::Function* write_complete_tuple_fn;
- RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
- &write_complete_tuple_fn));
+ RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, state, &write_complete_tuple_fn));
DCHECK(write_complete_tuple_fn != nullptr);
- RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
+ RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, state, write_complete_tuple_fn,
write_aligned_tuples_fn));
DCHECK(*write_aligned_tuples_fn != nullptr);
return Status::OK();
@@ -834,7 +832,7 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
CopyAndClearPartialTuple(pool);
- row->SetTuple(scan_node_->tuple_idx(), tuple_);
+ row->SetTuple(0, tuple_);
slot_idx_ = 0;
++num_tuples_processed;
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index d507df1..cbfbce8 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -61,9 +61,8 @@ class HdfsTextScanner : public HdfsScanner {
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
- static Status Codegen(HdfsScanNodeBase* node,
- const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
+ static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** write_aligned_tuples_fn);
/// Return true if we have builtin support for scanning text files compressed with this
/// codec.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 148bc2d..5490670 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1183,7 +1183,6 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
// we always call CommitRows() after TransferScratchTuples(), the output batch can
// never be empty.
DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
- DCHECK_EQ(scan_node_->tuple_idx(), 0);
DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
if (scratch_batch_->tuple_byte_size == 0) {
Tuple** output_row =
@@ -1213,17 +1212,18 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
return num_rows_to_commit;
}
-Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
- const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) {
- DCHECK(node->runtime_state()->ShouldCodegen());
+Status HdfsParquetScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** process_scratch_batch_fn) {
+ DCHECK(state->ShouldCodegen());
*process_scratch_batch_fn = nullptr;
- LlvmCodeGen* codegen = node->runtime_state()->codegen();
+ LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
DCHECK(fn != nullptr);
llvm::Function* eval_conjuncts_fn;
+ const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
DCHECK(eval_conjuncts_fn != nullptr);
@@ -1232,7 +1232,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
llvm::Function* eval_runtime_filters_fn;
RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
- codegen, node->filter_exprs(), &eval_runtime_filters_fn));
+ codegen, node->runtime_filter_exprs_, &eval_runtime_filters_fn));
DCHECK(eval_runtime_filters_fn != nullptr);
replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters");
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 8e5e7bc..3de99e1 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -351,10 +351,8 @@ class HdfsParquetScanner : public HdfsScanner {
/// Codegen ProcessScratchBatch(). Stores the resulting function in
/// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
- static Status Codegen(HdfsScanNodeBase* node,
- const std::vector<ScalarExpr*>& conjuncts,
- llvm::Function** process_scratch_batch_fn)
- WARN_UNUSED_RESULT;
+ static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ llvm::Function** process_scratch_batch_fn);
/// Helper function to create ColumnStatsReader object. 'col_order' might be NULL.
ColumnStatsReader CreateColumnStatsReader(