You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2020/02/26 23:55:32 UTC

[impala] 02/05: IMPALA-4080 [part 2]: Invoke codegen from scan's plan node instead of exec node

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ad3177035103dfcb1625b32f04f22ea87456eb6c
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Feb 14 14:33:12 2020 -0800

    IMPALA-4080 [part 2]: Invoke codegen from scan's plan node instead of
    exec node
    
    This patch moves the code responsible for invoking codegen for
    hdfs scanner to the plan node.
    
    Testing:
    Successfully ran exhaustive tests.
    Also verified manually that codegen code is generated and
    used for each hdfs scanner.
    
    Change-Id: I02c6183271b3731aa0d3cae2426be7269e462967
    Reviewed-on: http://gerrit.cloudera.org:8080/15259
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-avro-scanner-ir.cc         |   2 +-
 be/src/exec/hdfs-avro-scanner.cc            |  51 ++++----
 be/src/exec/hdfs-avro-scanner.h             |  15 +--
 be/src/exec/hdfs-orc-scanner.cc             |   2 +-
 be/src/exec/hdfs-rcfile-scanner.cc          |  12 +-
 be/src/exec/hdfs-rcfile-scanner.h           |   2 -
 be/src/exec/hdfs-scan-node-base.cc          | 196 ++++++++++++++++------------
 be/src/exec/hdfs-scan-node-base.h           |  85 ++++++++----
 be/src/exec/hdfs-scanner.cc                 |  45 +++----
 be/src/exec/hdfs-scanner.h                  |  12 +-
 be/src/exec/hdfs-sequence-scanner.cc        |  16 +--
 be/src/exec/hdfs-sequence-scanner.h         |   6 +-
 be/src/exec/hdfs-text-scanner.cc            |  16 +--
 be/src/exec/hdfs-text-scanner.h             |   5 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc |  12 +-
 be/src/exec/parquet/hdfs-parquet-scanner.h  |   6 +-
 16 files changed, 257 insertions(+), 226 deletions(-)

diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index abfc984..1d8ec60 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -47,7 +47,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat
         tuple))) {
       return 0;
     }
-    tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
+    tuple_row->SetTuple(0, tuple);
     if (EvalConjuncts(tuple_row)) {
       if (copy_strings) {
         if (UNLIKELY(!tuple->CopyStrings("HdfsAvroScanner::DecodeAvroData()",
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 8c82675..20fee8e 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -78,13 +78,12 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
   return Status::OK();
 }
 
-Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn) {
+Status HdfsAvroScanner::Codegen(HdfsScanPlanNode* node,
+   RuntimeState* state, llvm::Function** decode_avro_data_fn) {
   *decode_avro_data_fn = nullptr;
-  DCHECK(node->runtime_state()->ShouldCodegen());
-  LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != nullptr);
-  RETURN_IF_ERROR(CodegenDecodeAvroData(node, codegen, conjuncts, decode_avro_data_fn));
+  DCHECK(state->ShouldCodegen());
+  DCHECK(state->codegen() != nullptr);
+  RETURN_IF_ERROR(CodegenDecodeAvroData(node, state, decode_avro_data_fn));
   DCHECK(*decode_avro_data_fn != nullptr);
   return Status::OK();
 }
@@ -783,14 +782,14 @@ void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_
 // bail_out:                                         ; preds = %entry
 //   ret i1 false
 // }
-Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
+Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanPlanNode* node,
     LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) {
   llvm::LLVMContext& context = codegen->context();
   LlvmBuilder builder(context);
 
   llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HdfsAvroScanner>();
 
-  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
+  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc_);
   llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
   if (tuple_type == nullptr) return Status("Could not generate tuple struct.");
   llvm::Type* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
@@ -803,8 +802,8 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
 
   // Schema can be null if metadata is stale. See test in
   // queries/QueryTest/avro-schema-changes.test.
-  RETURN_IF_ERROR(CheckSchema(node->avro_schema()));
-  int num_children = node->avro_schema().children.size();
+  RETURN_IF_ERROR(CheckSchema(*node->avro_schema_.get()));
+  int num_children = node->avro_schema_->children.size();
   if (num_children == 0) {
     return Status("Invalid Avro record schema: contains no children.");
   }
@@ -845,7 +844,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
         llvm::BasicBlock::Create(context, "bail_out", helper_fn, nullptr);
 
     Status status = CodegenReadRecord(
-        SchemaPath(), node->avro_schema(), i, std::min(num_children, i + step_size),
+        SchemaPath(), *node->avro_schema_.get(), i, std::min(num_children, i + step_size),
         node, codegen, &builder, helper_fn, bail_out_block,
         bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val);
     if (!status.ok()) {
@@ -912,7 +911,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
 
 Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
     const AvroSchemaElement& record, int child_start, int child_end,
-    const HdfsScanNodeBase* node, LlvmCodeGen* codegen, void* void_builder,
+    const HdfsScanPlanNode* node, LlvmCodeGen* codegen, void* void_builder,
     llvm::Function* fn, llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out,
     llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
     llvm::Value* data_val, llvm::Value* data_end_val) {
@@ -927,16 +926,16 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
   // Used to store result of ReadUnionType() call
   llvm::Value* is_null_ptr = nullptr;
   for (int i = child_start; i < child_end; ++i) {
-    const AvroSchemaElement* field = &record.children[i];
+    const AvroSchemaElement& field = record.children[i];
     int col_idx = i;
     // If we're about to process the table-level columns, account for the partition keys
     // when constructing 'path'
-    if (path.empty()) col_idx += node->num_partition_keys();
+    if (path.empty()) col_idx += node->hdfs_table_->num_clustering_cols();
     SchemaPath new_path = path;
     new_path.push_back(col_idx);
     int slot_idx = node->GetMaterializedSlotIdx(new_path);
     SlotDescriptor* slot_desc = (slot_idx == HdfsScanNodeBase::SKIP_COLUMN) ?
-                                nullptr : node->materialized_slots()[slot_idx];
+                                nullptr : node->materialized_slots_[slot_idx];
 
     // Block that calls appropriate Read<Type> function
     llvm::BasicBlock* read_field_block =
@@ -951,12 +950,12 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
     llvm::BasicBlock* end_field_block =
         llvm::BasicBlock::Create(context, "end_field", fn, insert_before);
 
-    if (field->nullable()) {
+    if (field.nullable()) {
       // Field could be null. Create conditional branch based on ReadUnionType result.
       llvm::Function* read_union_fn =
           codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
       llvm::Value* null_union_pos_val =
-          codegen->GetI32Constant(field->null_union_position);
+          codegen->GetI32Constant(field.null_union_position);
       if (is_null_ptr == nullptr) {
         is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->bool_type(),
             "is_null_ptr");
@@ -992,15 +991,15 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
     // Write read_field_block IR
     builder->SetInsertPoint(read_field_block);
     llvm::Value* ret_val = nullptr;
-    if (field->schema->type == AVRO_RECORD) {
+    if (field.schema->type == AVRO_RECORD) {
       llvm::BasicBlock* insert_before_block =
           (null_block != nullptr) ? null_block : end_field_block;
-      RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, 0, field->children.size(),
+      RETURN_IF_ERROR(CodegenReadRecord(new_path, field, 0, field.children.size(),
           node, codegen, builder, fn,
           insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val,
           data_end_val));
     } else {
-      RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
+      RETURN_IF_ERROR(CodegenReadScalar(field, slot_desc, codegen, builder,
           this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val));
     }
     builder->CreateCondBr(ret_val, end_field_block, bail_out);
@@ -1098,9 +1097,11 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
   return Status::OK();
 }
 
-Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
-    LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
-    llvm::Function** decode_avro_data_fn) {
+Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanPlanNode* node,
+    RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+  const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
+  LlvmCodeGen* codegen = state->codegen();
+
   llvm::Function* materialize_tuple_fn;
   RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
   DCHECK(materialize_tuple_fn != nullptr);
@@ -1123,11 +1124,11 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
 
   llvm::Function* copy_strings_fn;
   RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
-      codegen, *node->tuple_desc(), &copy_strings_fn));
+      codegen, *node->tuple_desc_, &copy_strings_fn));
   replaced = codegen->ReplaceCallSites(fn, copy_strings_fn, "CopyStrings");
   DCHECK_REPLACE_COUNT(replaced, 1);
 
-  int tuple_byte_size = node->tuple_desc()->byte_size();
+  int tuple_byte_size = node->tuple_desc_->byte_size();
   replaced = codegen->ReplaceCallSitesWithValue(fn,
       codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
   DCHECK_REPLACE_COUNT(replaced, 1);
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index f88adcf..b3bd355 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -95,10 +95,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
 
   /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
   /// codegen was successful or nullptr otherwise.
-  static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** decode_avro_data_fn)
-      WARN_UNUSED_RESULT;
+  static Status Codegen(
+      HdfsScanPlanNode* node, RuntimeState* state, llvm::Function** decode_avro_data_fn);
 
   static const char* LLVM_CLASS_NAME;
 
@@ -210,15 +208,14 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
   /// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
   /// successful or returns an error.
-  static Status CodegenDecodeAvroData(const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** decode_avro_data_fn) WARN_UNUSED_RESULT;
+  static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, RuntimeState* state,
+      llvm::Function** decode_avro_data_fn);
 
   /// Codegens a version of MaterializeTuple() that reads records based on the table
   /// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was
   /// successful or returns an error.
   /// TODO: Codegen a function for each unique file schema.
-  static Status CodegenMaterializeTuple(const HdfsScanNodeBase* node,
+  static Status CodegenMaterializeTuple(const HdfsScanPlanNode* node,
       LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) WARN_UNUSED_RESULT;
 
   /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
@@ -238,7 +235,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// - child_start / child_end: specifies to only generate a subset of the record
   ///     schema's children
   static Status CodegenReadRecord(const SchemaPath& path, const AvroSchemaElement& record,
-      int child_start, int child_end, const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
+      int child_start, int child_end, const HdfsScanPlanNode* node, LlvmCodeGen* codegen,
       void* builder, llvm::Function* fn, llvm::BasicBlock* insert_before,
       llvm::BasicBlock* bail_out, llvm::Value* this_val, llvm::Value* pool_val,
       llvm::Value* tuple_val, llvm::Value* data_val,
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index b6b23c7..a38b345 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -687,7 +687,7 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
     if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
     InitTuple(tuple_desc, template_tuple_, tuple);
     RETURN_IF_ERROR(coll_reader->TransferTuple(tuple, dst_batch->tuple_data_pool()));
-    row->SetTuple(scan_node_->tuple_idx(), tuple);
+    row->SetTuple(0, tuple);
     if (!EvalRuntimeFilters(row)) continue;
     if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, row)) {
       row = next_row(row);
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index bf3d99c..dba8f51 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -602,7 +602,7 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
         RETURN_IF_ERROR(state_->LogOrReturnError(msg));
       }
 
-      current_row->SetTuple(scan_node_->tuple_idx(), tuple);
+      current_row->SetTuple(0, tuple);
       // Evaluate the conjuncts and add the row to the batch
       if (EvalConjuncts(current_row)) {
         ++num_to_commit;
@@ -638,13 +638,3 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
   }
   return Status::OK();
 }
-
-void HdfsRCFileScanner::DebugString(int indentation_level, stringstream* out) const {
-  // TODO: Add more details of internal state.
-  *out << string(indentation_level * 2, ' ')
-       << "HdfsRCFileScanner(tupleid=" << scan_node_->tuple_idx()
-       << " file=" << stream_->filename();
-  // TODO: Scanner::DebugString
-  //  ExecNode::DebugString(indentation_level, out);
-  *out << "])" << endl;
-}
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index f35bcf8..a1410a0 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -238,8 +238,6 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
 
   virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
 
-  void DebugString(int indentation_level, std::stringstream* out) const;
-
  private:
   /// The key class name located in the RCFile Header.
   /// This is always "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 49ba920..faaf55c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -43,6 +43,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"
@@ -151,14 +152,59 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ScanPlanNode::Init(tnode, state));
 
+  tuple_id_ = tnode.hdfs_scan_node.tuple_id;
+  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+  DCHECK(tuple_desc_->table_desc() != NULL);
+  hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
+
+  // Parse Avro table schema if applicable
+  const string& avro_schema_str = hdfs_table_->avro_schema();
+  if (!avro_schema_str.empty()) {
+    avro_schema_t avro_schema;
+    int error = avro_schema_from_json_length(
+        avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
+    if (error != 0) {
+      return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
+    }
+    RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
+  }
+
+  // Gather materialized partition-key slots and non-partition slots.
+  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
+  for (size_t i = 0; i < slots.size(); ++i) {
+    if (hdfs_table_->IsClusteringCol(slots[i])) {
+      partition_key_slots_.push_back(slots[i]);
+    } else {
+      materialized_slots_.push_back(slots[i]);
+    }
+  }
+
+  // Order the materialized slots such that for schemaless file formats (e.g. text) the
+  // order corresponds to the physical order in files. For formats where the file schema
+  // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
+  sort(materialized_slots_.begin(), materialized_slots_.end(),
+      SlotDescriptor::ColPathLessThan);
+
+  // Populate mapping from slot path to index into materialized_slots_.
+  for (int i = 0; i < materialized_slots_.size(); ++i) {
+    path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
+  }
+
+  // Initialize is_materialized_col_
+  is_materialized_col_.resize(hdfs_table_->num_cols());
+  for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
+    is_materialized_col_[i] =
+        GetMaterializedSlotIdx(vector<int>(1, i)) != HdfsScanNodeBase::SKIP_COLUMN;
+  }
+
   // Add collection item conjuncts
-  for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
+  for (const auto& entry : tnode.hdfs_scan_node.collection_conjuncts) {
     TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first);
-    RowDescriptor* collection_row_desc = state->obj_pool()->Add(
-        new RowDescriptor(tuple_desc, /* is_nullable */ false));
+    RowDescriptor* collection_row_desc =
+        state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false));
     DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end());
-    RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state,
-        &conjuncts_map_[entry.first]));
+    RETURN_IF_ERROR(ScalarExpr::Create(
+        entry.second, *collection_row_desc, state, &conjuncts_map_[entry.first]));
   }
   const TTupleId& tuple_id = tnode.hdfs_scan_node.tuple_id;
   DCHECK(conjuncts_map_[tuple_id].empty());
@@ -174,9 +220,29 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
+
+  // TODO: Find formats to be read across all instances once codegen done per fragment.
+  const TPlanFragmentInstanceCtx& instance_ctx = state->instance_ctx();
+  auto ranges = instance_ctx.per_node_scan_ranges.find(tnode.node_id);
+  if (ranges != instance_ctx.per_node_scan_ranges.end()) {
+    for (const TScanRangeParams& scan_range_param : ranges->second) {
+      const THdfsFileSplit& split = scan_range_param.scan_range.hdfs_file_split;
+      HdfsPartitionDescriptor* partition_desc =
+          hdfs_table_->GetPartition(split.partition_id);
+      scanned_file_formats_.insert(partition_desc->file_format());
+    }
+  }
   return Status::OK();
 }
 
+int HdfsScanPlanNode::GetMaterializedSlotIdx(const std::vector<int>& path) const {
+  auto result = path_to_materialized_slot_idx_.find(path);
+  if (result == path_to_materialized_slot_idx_.end()) {
+    return HdfsScanNodeBase::SKIP_COLUMN;
+  }
+  return result->second;
+}
+
 Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(tnode_->hdfs_scan_node.use_mt_scan_node ?
@@ -192,30 +258,33 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno
   : ScanNode(pool, pnode, descs),
     min_max_tuple_id_(
         hdfs_scan_node.__isset.min_max_tuple_id ? hdfs_scan_node.min_max_tuple_id : -1),
+    min_max_conjuncts_(pnode.min_max_conjuncts_),
     min_max_tuple_desc_(
         min_max_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(min_max_tuple_id_)),
     skip_header_line_count_(hdfs_scan_node.__isset.skip_header_line_count ?
             hdfs_scan_node.skip_header_line_count :
             0),
-    tuple_id_(hdfs_scan_node.tuple_id),
+    tuple_id_(pnode.tuple_id_),
     parquet_count_star_slot_offset_(
         hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
             hdfs_scan_node.parquet_count_star_slot_offset :
             -1),
-    tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
+    tuple_desc_(pnode.tuple_desc_),
+    hdfs_table_(pnode.hdfs_table_),
+    avro_schema_(*pnode.avro_schema_.get()),
+    conjuncts_map_(pnode.conjuncts_map_),
     thrift_dict_filter_conjuncts_map_(hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
             &hdfs_scan_node.dictionary_filter_conjuncts :
             nullptr),
+    codegend_fn_map_(pnode.codegend_fn_map_),
+    is_materialized_col_(pnode.is_materialized_col_),
+    materialized_slots_(pnode.materialized_slots_),
+    partition_key_slots_(pnode.partition_key_slots_),
     disks_accessed_bitmap_(TUnit::UNIT, 0),
-    active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
-  conjuncts_map_ = pnode.conjuncts_map_;
-  min_max_conjuncts_ = pnode.min_max_conjuncts_;
-}
+    active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
 
-HdfsScanNodeBase::~HdfsScanNodeBase() {
-}
+HdfsScanNodeBase::~HdfsScanNodeBase() {}
 
-/// TODO: Break up this very long function.
 Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ScanNode::Prepare(state));
@@ -241,49 +310,8 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   }
 
   // One-time initialization of state that is constant across scan ranges
-  DCHECK(tuple_desc_->table_desc() != NULL);
-  hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
   scan_node_pool_.reset(new MemPool(mem_tracker()));
 
-  // Parse Avro table schema if applicable
-  const string& avro_schema_str = hdfs_table_->avro_schema();
-  if (!avro_schema_str.empty()) {
-    avro_schema_t avro_schema;
-    int error = avro_schema_from_json_length(
-        avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
-    if (error != 0) {
-      return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
-    }
-    RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
-  }
-
-  // Gather materialized partition-key slots and non-partition slots.
-  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
-  for (size_t i = 0; i < slots.size(); ++i) {
-    if (hdfs_table_->IsClusteringCol(slots[i])) {
-      partition_key_slots_.push_back(slots[i]);
-    } else {
-      materialized_slots_.push_back(slots[i]);
-    }
-  }
-
-  // Order the materialized slots such that for schemaless file formats (e.g. text) the
-  // order corresponds to the physical order in files. For formats where the file schema
-  // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
-  sort(materialized_slots_.begin(), materialized_slots_.end(),
-      SlotDescriptor::ColPathLessThan);
-
-  // Populate mapping from slot path to index into materialized_slots_.
-  for (int i = 0; i < materialized_slots_.size(); ++i) {
-    path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
-  }
-
-  // Initialize is_materialized_col_
-  is_materialized_col_.resize(hdfs_table_->num_cols());
-  for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
-    is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
-  }
-
   HdfsFsCache::HdfsFsMap fs_cache;
   // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
   // partition_ids_, file_descs_, and per_type_files_.
@@ -374,37 +402,28 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
   DCHECK(state->ShouldCodegen());
   ExecNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
+  const HdfsScanPlanNode& const_plan_node =
+      static_cast<const HdfsScanPlanNode&>(plan_node_);
+  HdfsScanPlanNode& hdfs_plan_node = const_cast<HdfsScanPlanNode&>(const_plan_node);
+  hdfs_plan_node.Codegen(state, state->runtime_profile());
+}
 
-  // Create codegen'd functions
-  for (int format = THdfsFileFormat::TEXT; format <= THdfsFileFormat::PARQUET; ++format) {
-    vector<HdfsFileDesc*>& file_descs =
-        per_type_files_[static_cast<THdfsFileFormat::type>(format)];
-
-    if (file_descs.empty()) continue;
-
-    // Randomize the order this node processes the files. We want to do this to avoid
-    // issuing remote reads to the same DN from different impalads. In file formats such
-    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
-    // If every node goes through the files in the same order, all the remote reads are
-    // for the same file meaning a few DN serves a lot of remote reads at the same time.
-    random_shuffle(file_descs.begin(), file_descs.end());
-
-    // Create reusable codegen'd functions for each file type type needed
-    // TODO: do this for conjuncts_map_
+void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+  for (THdfsFileFormat::type format: scanned_file_formats_) {
     llvm::Function* fn;
     Status status;
     switch (format) {
       case THdfsFileFormat::TEXT:
-        status = HdfsTextScanner::Codegen(this, conjuncts_, &fn);
+        status = HdfsTextScanner::Codegen(this, state, &fn);
         break;
       case THdfsFileFormat::SEQUENCE_FILE:
-        status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn);
+        status = HdfsSequenceScanner::Codegen(this, state, &fn);
         break;
       case THdfsFileFormat::AVRO:
-        status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn);
+        status = HdfsAvroScanner::Codegen(this, state, &fn);
         break;
       case THdfsFileFormat::PARQUET:
-        status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn);
+        status = HdfsParquetScanner::Codegen(this, state, &fn);
         break;
       default:
         // No codegen for this format
@@ -416,10 +435,10 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
     if (status.ok()) {
       LlvmCodeGen* codegen = state->codegen();
       DCHECK(codegen != NULL);
-      codegen->AddFunctionToJit(fn,
-          &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
+      codegen->AddFunctionToJit(
+          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
     }
-    runtime_profile()->AddCodegenMsg(status.ok(), status, format_name);
+    profile->AddCodegenMsg(status.ok(), status, format_name);
   }
 }
 
@@ -588,6 +607,12 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
         SkipFile(v.first, file);
       }
     }
+    // Randomize the order this node processes the files. We want to do this to avoid
+    // issuing remote reads to the same DN from different impalads. In file formats such
+    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
+    // If every node goes through the files in the same order, all the remote reads are
+    // for the same file meaning a few DN serves a lot of remote reads at the same time.
+    random_shuffle(matching_files->begin(), matching_files->end());
   }
 
   // Issue initial ranges for all file types. Only call functions for file types that
@@ -770,7 +795,7 @@ void* HdfsScanNodeBase::GetFileMetadata(
 }
 
 void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
-  CodegendFnMap::iterator it = codegend_fn_map_.find(type);
+  auto it = codegend_fn_map_.find(type);
   if (it == codegend_fn_map_.end()) return NULL;
   return it->second;
 }
@@ -857,10 +882,10 @@ void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_des
 void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
   DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
   const TupleDescriptor& tuple_desc =
-      *row_batch->row_desc()->tuple_descriptors()[tuple_idx()];
+      *row_batch->row_desc()->tuple_descriptors()[0];
   if (tuple_desc.collection_slots().empty()) return;
   for (int i = 0; i < row_batch->num_rows(); ++i) {
-    Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
+    Tuple* tuple = row_batch->GetRow(i)->GetTuple(0);
     DCHECK(tuple != NULL);
     InitNullCollectionValues(&tuple_desc, tuple);
   }
@@ -916,12 +941,11 @@ void HdfsScanNodeBase::SkipFile(const THdfsFileFormat::type& file_type,
   }
 }
 
-void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
-  const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts();
+void HdfsScanPlanNode::ComputeSlotMaterializationOrder(
+    const DescriptorTbl& desc_tbl, vector<int>* order) const {
+  const vector<ScalarExpr*>& conjuncts = conjuncts_;
   // Initialize all order to be conjuncts.size() (after the last conjunct)
-  order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
-
-  const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
+  order->insert(order->begin(), materialized_slots_.size(), conjuncts.size());
 
   vector<SlotId> slot_ids;
   for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 6b5a7a5..edd8cc3 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -103,6 +103,20 @@ class HdfsScanPlanNode : public ScanPlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  void Codegen(RuntimeState* state, RuntimeProfile* profile);
+
+  /// Returns index into materialized_slots with 'path'.  Returns SKIP_COLUMN if
+  /// that path is not materialized. Only valid to call after Init().
+  int GetMaterializedSlotIdx(const std::vector<int>& path) const;
+
+  /// Utility function to compute the order in which to materialize slots to allow for
+  /// computing conjuncts as slots get materialized (on partial tuples).
+  /// 'order' will contain for each slot, the first conjunct it is associated with.
+  /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
+  /// evaluating conjuncts[1].  Slots that are not referenced by any conjuncts will have
+  /// order set to conjuncts.size(). Only valid to call after Init().
+  void ComputeSlotMaterializationOrder(
+      const DescriptorTbl& desc_tbl, std::vector<int>* order) const;
 
   /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
   /// item tuples). Includes a copy of PlanNode.conjuncts_.
@@ -111,6 +125,42 @@ class HdfsScanPlanNode : public ScanPlanNode {
 
   /// Conjuncts to evaluate on parquet::Statistics.
   std::vector<ScalarExpr*> min_max_conjuncts_;
+
+  /// Tuple id resolved in Init() to set tuple_desc_ .
+  int tuple_id_ = -1;
+
+  /// Descriptor for tuples this scan node constructs
+  const TupleDescriptor* tuple_desc_ = nullptr;
+
+  /// Maps from a slot's path to its index into materialized_slots_.
+  boost::unordered_map<std::vector<int>, int> path_to_materialized_slot_idx_;
+
+  /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
+  /// for 0 <= i < total # columns in table
+  //
+  /// This should be a vector<bool>, but bool vectors are special-cased and not stored
+  /// internally as arrays, so instead we store as chars and cast to bools as needed
+  std::vector<char> is_materialized_col_;
+
+  /// Vector containing slot descriptors for all non-partition key slots.  These
+  /// descriptors are sorted in order of increasing col_pos.
+  std::vector<SlotDescriptor*> materialized_slots_;
+
+  /// Vector containing slot descriptors for all partition key slots.
+  std::vector<SlotDescriptor*> partition_key_slots_;
+
+  /// Descriptor for the hdfs table, including partition and format metadata.
+  /// Set in Init, owned by QueryState
+  const HdfsTableDescriptor* hdfs_table_ = nullptr;
+
+  /// The root of the table's Avro schema, if we're scanning an Avro table.
+  ScopedAvroSchemaElement avro_schema_;
+
+  /// File formats that instances of this node will read.
+  boost::unordered_set<THdfsFileFormat::type> scanned_file_formats_;
+
+  /// Per scanner type codegen'd fn.
+  boost::unordered_map<THdfsFileFormat::type, void*> codegend_fn_map_;
 };
 
 /// Base class for all Hdfs scan nodes. Contains common members and functions
@@ -190,10 +240,6 @@ class HdfsScanNodeBase : public ScanNode {
   const std::vector<SlotDescriptor*>& materialized_slots()
       const { return materialized_slots_; }
 
-  /// Returns the tuple idx into the row for this scan node to output to.
-  /// Currently this is always 0.
-  int tuple_idx() const { return 0; }
-
   /// Returns number of partition keys in the table.
   int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
 
@@ -209,7 +255,7 @@ class HdfsScanNodeBase : public ScanNode {
   const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; }
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
   const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
-  const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
+  const AvroSchemaElement& avro_schema() const { return avro_schema_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
   io::RequestContext* reader_context() const { return reader_context_.get(); }
   bool optimize_parquet_count_star() const {
@@ -239,9 +285,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Returns index into materialized_slots with 'path'.  Returns SKIP_COLUMN if
   /// that path is not materialized.
   int GetMaterializedSlotIdx(const std::vector<int>& path) const {
-    PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
-    if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
-    return result->second;
+    return static_cast<const HdfsScanPlanNode&>(plan_node_).GetMaterializedSlotIdx(path);
   }
 
   /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
@@ -356,14 +400,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// Calls RangeComplete() with skipped=true for all the splits of the file
   void SkipFile(const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
 
-  /// Utility function to compute the order in which to materialize slots to allow for
-  /// computing conjuncts as slots get materialized (on partial tuples).
-  /// 'order' will contain for each slot, the first conjunct it is associated with.
-  /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
-  /// evaluating conjuncts[1].  Slots that are not referenced by any conjuncts will have
-  /// order set to conjuncts.size()
-  void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
-
   /// Returns true if there are no materialized slots, such as a count(*) over the table.
   inline bool IsZeroSlotTableScan() const {
     return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
@@ -430,7 +466,7 @@ class HdfsScanNodeBase : public ScanNode {
   // to values > 0 for hdfs text files.
   const int skip_header_line_count_;
 
-  /// Tuple id resolved in Prepare() to set tuple_desc_
+  /// Tuple id of the tuple descriptor to be used.
   const int tuple_id_;
 
   /// The byte offset of the slot for Parquet metadata if Parquet count star optimization
@@ -455,7 +491,7 @@ class HdfsScanNodeBase : public ScanNode {
   const HdfsTableDescriptor* hdfs_table_ = nullptr;
 
   /// The root of the table's Avro schema, if we're scanning an Avro table.
-  ScopedAvroSchemaElement avro_schema_;
+  const AvroSchemaElement& avro_schema_;
 
   /// Partitions scanned by this scan node.
   std::unordered_set<int64_t> partition_ids_;
@@ -503,26 +539,21 @@ class HdfsScanNodeBase : public ScanNode {
   AtomicInt32 remaining_scan_range_submissions_ = { 1 };
 
   /// Per scanner type codegen'd fn.
-  typedef boost::unordered_map<THdfsFileFormat::type, void*> CodegendFnMap;
-  CodegendFnMap codegend_fn_map_;
-
-  /// Maps from a slot's path to its index into materialized_slots_.
-  typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
-  PathToSlotIdxMap path_to_materialized_slot_idx_;
+  const boost::unordered_map<THdfsFileFormat::type, void*>& codegend_fn_map_;
 
   /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
   /// for 0 <= i < total # columns in table
   //
   /// This should be a vector<bool>, but bool vectors are special-cased and not stored
   /// internally as arrays, so instead we store as chars and cast to bools as needed
-  std::vector<char> is_materialized_col_;
+  const std::vector<char>& is_materialized_col_;
 
   /// Vector containing slot descriptors for all non-partition key slots.  These
   /// descriptors are sorted in order of increasing col_pos.
-  std::vector<SlotDescriptor*> materialized_slots_;
+  const std::vector<SlotDescriptor*>& materialized_slots_;
 
   /// Vector containing slot descriptors for all partition key slots.
-  std::vector<SlotDescriptor*> partition_key_slots_;
+  const std::vector<SlotDescriptor*>& partition_key_slots_;
 
   /// Keeps track of total splits and the number finished.
   ProgressUpdater progress_;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 345a140..ee6eb5b 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -214,7 +214,6 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
 
 int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
   DCHECK_GE(num_tuples, 0);
-  DCHECK_EQ(scan_node_->tuple_idx(), 0);
   DCHECK_EQ(scan_node_->materialized_slots().size(), 0);
   int num_to_commit = 0;
   if (LIKELY(conjunct_evals_->size() == 0)) {
@@ -260,7 +259,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
     *error_in_row |= error;
   }
 
-  tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
+  tuple_row->SetTuple(0, tuple);
   return EvalConjuncts(tuple_row);
 }
 
@@ -321,21 +320,21 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
 // eval_fail:                                        ; preds = %parse
 //   ret i1 false
 // }
-Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
-    LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
-    llvm::Function** write_complete_tuple_fn) {
+Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
+    RuntimeState* state, llvm::Function** write_complete_tuple_fn) {
+  const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
+  LlvmCodeGen* codegen = state->codegen();
   *write_complete_tuple_fn = NULL;
-  RuntimeState* state = node->runtime_state();
 
   // Cast away const-ness.  The codegen only sets the cached typed llvm struct.
-  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
+  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc_);
   vector<llvm::Function*> slot_fns;
-  for (int i = 0; i < node->materialized_slots().size(); ++i) {
+  for (int i = 0; i < node->materialized_slots_.size(); ++i) {
     llvm::Function* fn = nullptr;
-    SlotDescriptor* slot_desc = node->materialized_slots()[i];
+    SlotDescriptor* slot_desc = node->materialized_slots_[i];
     RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
-        node->hdfs_table()->null_column_value().data(),
-        node->hdfs_table()->null_column_value().size(), true, state->strict_mode()));
+        node->hdfs_table_->null_column_value().data(),
+        node->hdfs_table_->null_column_value().size(), true, state->strict_mode()));
     if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
     slot_fns.push_back(fn);
   }
@@ -343,7 +342,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   // Compute order to materialize slots.  BE assumes that conjuncts should
   // be evaluated in the order specified (optimization is already done by FE)
   vector<int> materialize_order;
-  node->ComputeSlotMaterializationOrder(&materialize_order);
+  node->ComputeSlotMaterializationOrder(state->desc_tbl(), &materialize_order);
 
   // Get types to construct matching function signature to WriteCompleteTuple
   llvm::PointerType* uint8_ptr_type = codegen->i8_ptr_type();
@@ -401,7 +400,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   // Put tuple in tuple_row
   llvm::Value* tuple_row_typed =
       builder.CreateBitCast(tuple_row_arg, llvm::PointerType::get(tuple_ptr_type, 0));
-  llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(node->tuple_idx())};
+  llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(0)};
   llvm::Value* tuple_in_row_addr =
       builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs);
   builder.CreateStore(tuple_arg, tuple_in_row_addr);
@@ -493,7 +492,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
         state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
         return status;
       }
-      if (node->materialized_slots().size() + conjunct_idx
+      if (node->materialized_slots_.size() + conjunct_idx
           >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
         codegen->SetNoInline(conjunct_fn);
       }
@@ -516,7 +515,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   builder.SetInsertPoint(eval_fail_block);
   builder.CreateRet(codegen->false_value());
 
-  if (node->materialized_slots().size() + conjuncts.size()
+  if (node->materialized_slots_.size() + conjuncts.size()
       > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(fn);
   }
@@ -527,9 +526,10 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   return Status::OK();
 }
 
-Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
-    LlvmCodeGen* codegen, llvm::Function* write_complete_tuple_fn,
+Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanPlanNode* node,
+    RuntimeState* state, llvm::Function* write_complete_tuple_fn,
     llvm::Function** write_aligned_tuples_fn) {
+  LlvmCodeGen* codegen = state->codegen();
   *write_aligned_tuples_fn = NULL;
   DCHECK(write_complete_tuple_fn != NULL);
 
@@ -543,12 +543,12 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
 
   llvm::Function* copy_strings_fn;
   RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
-      codegen, *node->tuple_desc(), &copy_strings_fn));
+      codegen, *node->tuple_desc_, &copy_strings_fn));
   replaced = codegen->ReplaceCallSites(
       write_tuples_fn, copy_strings_fn, "CopyStrings");
   DCHECK_REPLACE_COUNT(replaced, 1);
 
-  int tuple_byte_size = node->tuple_desc()->byte_size();
+  int tuple_byte_size = node->tuple_desc_->byte_size();
   replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn,
       codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
   DCHECK_REPLACE_COUNT(replaced, 1);
@@ -561,16 +561,17 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
 }
 
 Status HdfsScanner::CodegenInitTuple(
-    const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn) {
+    const HdfsScanPlanNode* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn) {
   *init_tuple_fn = codegen->GetFunction(IRFunction::HDFS_SCANNER_INIT_TUPLE, true);
   DCHECK(*init_tuple_fn != nullptr);
 
   // Replace all of the constants in InitTuple() to specialize the code.
+  bool materialized_partition_keys_exist = !node->partition_key_slots_.empty();
   int replaced = codegen->ReplaceCallSitesWithBoolConst(
-      *init_tuple_fn, node->num_materialized_partition_keys() > 0, "has_template_tuple");
+      *init_tuple_fn, materialized_partition_keys_exist, "has_template_tuple");
   DCHECK_REPLACE_COUNT(replaced, 1);
 
-  const TupleDescriptor* tuple_desc = node->tuple_desc();
+  const TupleDescriptor* tuple_desc = node->tuple_desc_;
   replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
       codegen->GetI32Constant(tuple_desc->byte_size()), "tuple_byte_size");
   DCHECK_REPLACE_COUNT(replaced, 1);
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 3e19658..bc7d66e 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -482,23 +482,21 @@ class HdfsScanner {
   /// Codegen function to replace WriteCompleteTuple. Should behave identically
   /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
   /// if codegen was successful or NULL otherwise.
-  static Status CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
-      LlvmCodeGen* codegen, const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_complete_tuple_fn) WARN_UNUSED_RESULT;
+  static Status CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
+      RuntimeState* state, llvm::Function** write_complete_tuple_fn);
 
   /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is cross
   /// compiled to IR.  This function loads the precompiled IR function, modifies it,
   /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
   /// successful or NULL otherwise.
-  static Status CodegenWriteAlignedTuples(const HdfsScanNodeBase*, LlvmCodeGen*,
-      llvm::Function* write_tuple_fn,
-      llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
+  static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, RuntimeState*,
+      llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
 
   /// Codegen function to replace InitTuple() removing runtime constants like the tuple
   /// size and branches like the template tuple existence check. The codegen'd version
   /// of InitTuple() is stored in 'init_tuple_fn' if codegen was successful.
   static Status CodegenInitTuple(
-      const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn);
+      const HdfsScanPlanNode* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn);
 
   /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted version
   /// and emitting a customized version of EvalRuntimeFilter() for each filter in
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 610325b..2cc08a0 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -48,18 +48,16 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 }
 
 // Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn) {
+Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+    llvm::Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = nullptr;
-  DCHECK(node->runtime_state()->ShouldCodegen());
-  LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != nullptr);
+  DCHECK(state->ShouldCodegen());
+  DCHECK(state->codegen() != nullptr);
   llvm::Function* write_complete_tuple_fn;
-  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
-      &write_complete_tuple_fn));
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, state, &write_complete_tuple_fn));
   DCHECK(write_complete_tuple_fn != nullptr);
-  RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
-      write_aligned_tuples_fn));
+  RETURN_IF_ERROR(CodegenWriteAlignedTuples(
+      node, state, write_complete_tuple_fn, write_aligned_tuples_fn));
   DCHECK(*write_aligned_tuples_fn != nullptr);
   return Status::OK();
 }
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index e84a5e7..0b79b78 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -171,10 +171,8 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
-  static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_aligned_tuples_fn)
-      WARN_UNUSED_RESULT;
+  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+      llvm::Function** write_aligned_tuples_fn);
 
  protected:
   /// Implementation of sequence container super class methods.
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 03ac83e..acd7f54 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -762,17 +762,15 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 // Codegen for materializing parsed data into tuples.  The function WriteCompleteTuple is
 // handcrafted using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn) {
+Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+    llvm::Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = nullptr;
-  DCHECK(node->runtime_state()->ShouldCodegen());
-  LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != nullptr);
+  DCHECK(state->ShouldCodegen());
+  DCHECK(state->codegen() != nullptr);
   llvm::Function* write_complete_tuple_fn;
-  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
-      &write_complete_tuple_fn));
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, state, &write_complete_tuple_fn));
   DCHECK(write_complete_tuple_fn != nullptr);
-  RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
+  RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, state, write_complete_tuple_fn,
       write_aligned_tuples_fn));
   DCHECK(*write_aligned_tuples_fn != nullptr);
   return Status::OK();
@@ -834,7 +832,7 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
 
       CopyAndClearPartialTuple(pool);
 
-      row->SetTuple(scan_node_->tuple_idx(), tuple_);
+      row->SetTuple(0, tuple_);
 
       slot_idx_ = 0;
       ++num_tuples_processed;
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index d507df1..cbfbce8 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -61,9 +61,8 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
-  static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
+  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+      llvm::Function** write_aligned_tuples_fn);
 
   /// Return true if we have builtin support for scanning text files compressed with this
   /// codec.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 148bc2d..5490670 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1183,7 +1183,6 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   // we always call CommitRows() after TransferScratchTuples(), the output batch can
   // never be empty.
   DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
-  DCHECK_EQ(scan_node_->tuple_idx(), 0);
   DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
   if (scratch_batch_->tuple_byte_size == 0) {
     Tuple** output_row =
@@ -1213,17 +1212,18 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   return num_rows_to_commit;
 }
 
-Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) {
-  DCHECK(node->runtime_state()->ShouldCodegen());
+Status HdfsParquetScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+      llvm::Function** process_scratch_batch_fn) {
+  DCHECK(state->ShouldCodegen());
   *process_scratch_batch_fn = nullptr;
-  LlvmCodeGen* codegen = node->runtime_state()->codegen();
+  LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
 
   llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
   DCHECK(fn != nullptr);
 
   llvm::Function* eval_conjuncts_fn;
+  const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
   RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
   DCHECK(eval_conjuncts_fn != nullptr);
 
@@ -1232,7 +1232,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
 
   llvm::Function* eval_runtime_filters_fn;
   RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
-      codegen, node->filter_exprs(), &eval_runtime_filters_fn));
+      codegen, node->runtime_filter_exprs_, &eval_runtime_filters_fn));
   DCHECK(eval_runtime_filters_fn != nullptr);
 
   replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters");
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 8e5e7bc..3de99e1 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -351,10 +351,8 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** process_scratch_batch_fn)
-      WARN_UNUSED_RESULT;
+  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+      llvm::Function** process_scratch_batch_fn);
 
   /// Helper function to create ColumnStatsReader object. 'col_order' might be NULL.
   ColumnStatsReader CreateColumnStatsReader(