You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/10 04:43:36 UTC

[3/3] incubator-impala git commit: IMPALA-4098: Open()/Close() partition exprs once per fragment instance.

IMPALA-4098: Open()/Close() partition exprs once per fragment instance.

Partition exprs stored in the descriptor table can be referenced by multiple
exec nodes (and/or a data sink) within the same fragment instance, so the
lifecycle of those exprs (Prepare/Open/Close) is tied to the fragment instance
and not to a particular exec node.

A recent change exposed this improper lifecycle management because we cloned
the partition exprs before using them, but by that time the exprs had been
closed which caused the cloning function to hit a DCHECK.

The fix is to tie the lifecycle of those exprs to that of the fragment
instance.

Testing: I could reliably reproduce the bug by running this query in a loop:

set num_nodes=1;
select count(a.year), count(a.month), count(a.int_col),
       count(b.year), count(b.month), count(b.int_col)
from functional.alltypessmall a, functional.alltypessmall b;

After this patch I was not able to reproduce the bug anymore. I don't think
it makes sense to add a test specifically for this bug because our existing
tests already caught it, and the hit DCHECK does not exist anymore due to
restructuring.

Change-Id: Id179df645e500530f4418988f6ce64a03d669892
Reviewed-on: http://gerrit.cloudera.org:8080/4340
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/218019e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/218019e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/218019e5

Branch: refs/heads/master
Commit: 218019e59fc6740e1564c91516b0dee46c64ed83
Parents: 7b4a6fa
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Sep 8 17:02:06 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Sep 10 01:28:00 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc       | 39 ++---------------
 be/src/exec/hdfs-scanner.cc              |  8 +---
 be/src/exec/hdfs-table-sink.cc           | 19 ---------
 be/src/runtime/descriptors.cc            | 60 +++++++++++++--------------
 be/src/runtime/descriptors.h             | 35 +++++++++-------
 be/src/runtime/plan-fragment-executor.cc |  4 ++
 6 files changed, 59 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index dba67b5..de1dad0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -283,16 +283,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
             try_cache, expected_local, file_desc->mtime));
   }
 
-  // Prepare all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    // This is IMPALA-1702, but will have been caught earlier in this method.
-    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
-                                   << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
-  }
-
   // Update server wide metrics for number of scan ranges and ranges that have
   // incomplete metadata.
   ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
@@ -373,19 +363,14 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
 
   for (FilterContext& filter: filter_ctxs_) RETURN_IF_ERROR(filter.expr->Open(state));
 
-  // Open all the partition exprs used by the scan node and create template tuples.
+  // Create template tuples for all partitions.
   for (int64_t partition_id: partition_ids_) {
     HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
     DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
                                    << " partition_id=" << partition_id
                                    << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->OpenExprs(state));
-    vector<ExprContext*> partition_key_value_ctxs;
-    RETURN_IF_ERROR(Expr::CloneIfNotExists(
-        partition_desc->partition_key_value_ctxs(), state, &partition_key_value_ctxs));
-    partition_template_tuple_map_[partition_id] =
-        InitTemplateTuple(partition_key_value_ctxs, scan_node_pool_.get(), state);
-    Expr::Close(partition_key_value_ctxs, state);
+    partition_template_tuple_map_[partition_id] = InitTemplateTuple(
+        partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state);
   }
 
   RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext(
@@ -476,20 +461,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
 
   if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
 
-  // Close all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    if (partition_desc == NULL) {
-      // TODO: Revert when IMPALA-1702 is fixed.
-      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << partition_id
-                 << "\n" << PrintThrift(state->fragment_params());
-      continue;
-    }
-    partition_desc->CloseExprs(state);
-  }
-
-  // Open collection conjuncts
+  // Close collection conjuncts
   for (const auto& tid_conjunct: conjuncts_map_) {
     // conjuncts_ are already closed in ExecNode::Close()
     if (tid_conjunct.first == tuple_id_) continue;
@@ -680,9 +652,6 @@ Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ExprContext*>& value_ctx
   for (int i = 0; i < partition_key_slots_.size(); ++i) {
     const SlotDescriptor* slot_desc = partition_key_slots_[i];
     ExprContext* value_ctx = value_ctxs[slot_desc->col_pos()];
-    /// This function may be called from multiple threads, and we expect each
-    /// thread to pass in their own cloned value contexts.
-    DCHECK(value_ctx->is_clone());
     // Exprs guaranteed to be literals, so can safely be evaluated without a row.
     RawValue::Write(value_ctx->GetValue(NULL), template_tuple, slot_desc, NULL);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 841d1e4..28eb606 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -118,14 +118,10 @@ Status HdfsScanner::Open(ScannerContext* context) {
   scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
 
   // Initialize the template_tuple_.
-  vector<ExprContext*> partition_key_value_ctxs;
-  RETURN_IF_ERROR(Expr::CloneIfNotExists(
-      context_->partition_descriptor()->partition_key_value_ctxs(), state_,
-      &partition_key_value_ctxs));
-  template_tuple_ = scan_node_->InitTemplateTuple(partition_key_value_ctxs,
+  template_tuple_ = scan_node_->InitTemplateTuple(
+      context_->partition_descriptor()->partition_key_value_ctxs(),
       template_tuple_pool_.get(), state_);
   template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
-  Expr::Close(partition_key_value_ctxs, state_);
 
   decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 3211c1c..77316d4 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -101,13 +101,6 @@ Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
   DCHECK_GE(output_expr_ctxs_.size(),
       table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString();
 
-  // Prepare literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    RETURN_IF_ERROR(partition->PrepareExprs(state));
-  }
-
   return Status::OK();
 }
 
@@ -161,12 +154,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 Status HdfsTableSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(partition_key_expr_ctxs_, state));
-  // Open literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    RETURN_IF_ERROR(partition->OpenExprs(state));
-  }
 
   // Get file format for default partition in table descriptor, and build a map from
   // partition key values to partition descriptor for multiple output format support. The
@@ -667,12 +654,6 @@ void HdfsTableSink::Close(RuntimeState* state) {
   }
   partition_keys_to_output_partitions_.clear();
 
-  // Close literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    partition->CloseExprs(state);
-  }
   Expr::Close(output_expr_ctxs_, state);
   Expr::Close(partition_key_expr_ctxs_, state);
   DataSink::Close(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 548370a..47ca791 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -145,6 +145,7 @@ TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc)
   : name_(tdesc.tableName),
     database_(tdesc.dbName),
     id_(tdesc.id),
+    type_(tdesc.tableType),
     num_clustering_cols_(tdesc.numClusteringCols) {
   for (int i = 0; i < tdesc.columnDescriptors.size(); ++i) {
     col_descs_.push_back(ColumnDescriptor(tdesc.columnDescriptors[i]));
@@ -176,9 +177,6 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& thrift_table,
     escape_char_(thrift_partition.escapeChar),
     block_size_(thrift_partition.blockSize),
     id_(thrift_partition.id),
-    exprs_prepared_(false),
-    exprs_opened_(false),
-    exprs_closed_(false),
     file_format_(thrift_partition.fileFormat),
     object_pool_(pool) {
   DecompressLocation(thrift_table, thrift_partition, &location_);
@@ -192,29 +190,6 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& thrift_table,
   }
 }
 
-Status HdfsPartitionDescriptor::PrepareExprs(RuntimeState* state) {
-  if (!exprs_prepared_) {
-    // TODO: RowDescriptor should arguably be optional in Prepare for known literals
-    exprs_prepared_ = true;
-    // Partition exprs are not used in the codegen case.  Don't codegen them.
-    RETURN_IF_ERROR(Expr::Prepare(partition_key_value_ctxs_, state, RowDescriptor(),
-                                  state->instance_mem_tracker()));
-  }
-  return Status::OK();
-}
-
-Status HdfsPartitionDescriptor::OpenExprs(RuntimeState* state) {
-  if (exprs_opened_) return Status::OK();
-  exprs_opened_ = true;
-  return Expr::Open(partition_key_value_ctxs_, state);
-}
-
-void HdfsPartitionDescriptor::CloseExprs(RuntimeState* state) {
-  if (exprs_closed_ || !exprs_prepared_) return;
-  exprs_closed_ = true;
-  Expr::Close(partition_key_value_ctxs_, state);
-}
-
 string HdfsPartitionDescriptor::DebugString() const {
   stringstream out;
   out << " file_format=" << file_format_ << "'"
@@ -238,13 +213,11 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc,
     null_partition_key_value_(tdesc.hdfsTable.nullPartitionKeyValue),
     null_column_value_(tdesc.hdfsTable.nullColumnValue),
     object_pool_(pool) {
-  map<int64_t, THdfsPartition>::const_iterator it;
-  for (it = tdesc.hdfsTable.partitions.begin(); it != tdesc.hdfsTable.partitions.end();
-       ++it) {
+  for (const auto& entry : tdesc.hdfsTable.partitions) {
     HdfsPartitionDescriptor* partition =
-        new HdfsPartitionDescriptor(tdesc.hdfsTable, it->second, pool);
+        new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second, pool);
     object_pool_->Add(partition);
-    partition_descriptors_[it->first] = partition;
+    partition_descriptors_[entry.first] = partition;
   }
   avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : "";
 }
@@ -532,6 +505,31 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb
   return Status::OK();
 }
 
+Status DescriptorTbl::PrepareAndOpenPartitionExprs(RuntimeState* state) const {
+  for (const auto& tbl_entry : tbl_desc_map_) {
+    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
+    HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second);
+    for (const auto& part_entry : hdfs_tbl->partition_descriptors()) {
+      // TODO: RowDescriptor should arguably be optional in Prepare for known literals
+      // Partition exprs are not used in the codegen case.  Don't codegen them.
+      RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), state,
+          RowDescriptor(), state->instance_mem_tracker()));
+      RETURN_IF_ERROR(Expr::Open(part_entry.second->partition_key_value_ctxs(), state));
+    }
+  }
+  return Status::OK();
+}
+
+void DescriptorTbl::ClosePartitionExprs(RuntimeState* state) const {
+  for (const auto& tbl_entry: tbl_desc_map_) {
+    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
+    HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second);
+    for (const auto& part_entry: hdfs_tbl->partition_descriptors()) {
+      Expr::Close(part_entry.second->partition_key_value_ctxs(), state);
+    }
+  }
+}
+
 TableDescriptor* DescriptorTbl::GetTableDescriptor(TableId id) const {
   // TODO: is there some boost function to do exactly this?
   TableDescriptorMap::const_iterator i = tbl_desc_map_.find(id);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 10c2ff3..874ad69 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -204,6 +204,7 @@ class TableDescriptor {
   const std::string& name() const { return name_; }
   const std::string& database() const { return database_; }
   int id() const { return id_; }
+  TTableType::type type() const { return type_; }
   const std::vector<ColumnDescriptor>& col_descs() const { return col_descs_; }
 
   /// Returns "<database>.<name>"
@@ -213,6 +214,7 @@ class TableDescriptor {
   std::string name_;
   std::string database_;
   TableId id_;
+  TTableType::type type_;
   int num_clustering_cols_;
   std::vector<ColumnDescriptor> col_descs_;
 };
@@ -222,26 +224,24 @@ class HdfsPartitionDescriptor {
  public:
   HdfsPartitionDescriptor(const THdfsTable& thrift_table,
       const THdfsPartition& thrift_partition, ObjectPool* pool);
+
   char line_delim() const { return line_delim_; }
   char field_delim() const { return field_delim_; }
   char collection_delim() const { return collection_delim_; }
   char escape_char() const { return escape_char_; }
   THdfsFileFormat::type file_format() const { return file_format_; }
-  const std::vector<ExprContext*>& partition_key_value_ctxs() const {
-    return partition_key_value_ctxs_;
-  }
   int block_size() const { return block_size_; }
   const std::string& location() const { return location_; }
   int64_t id() const { return id_; }
-
-  /// Calls Prepare()/Open()/Close() on all partition key exprs. Idempotent (this is
-  /// because both HdfsScanNode and HdfsTableSink may both use the same partition desc).
-  Status PrepareExprs(RuntimeState* state);
-  Status OpenExprs(RuntimeState* state);
-  void CloseExprs(RuntimeState* state);
-
   std::string DebugString() const;
 
+  /// It is safe to evaluate the returned expr contexts concurrently from multiple
+  /// threads because all exprs are literals, after the descriptor table has been
+  /// opened.
+  const std::vector<ExprContext*>& partition_key_value_ctxs() const {
+    return partition_key_value_ctxs_;
+  }
+
  private:
   char line_delim_;
   char field_delim_;
@@ -253,13 +253,12 @@ class HdfsPartitionDescriptor {
   std::string location_;
   int64_t id_;
 
-  /// True if PrepareExprs has been called, to prevent repeating expensive codegen
-  bool exprs_prepared_;
-  bool exprs_opened_;
-  bool exprs_closed_;
-
   /// List of literal (and therefore constant) expressions for each partition key. Their
   /// order corresponds to the first num_clustering_cols of the parent table.
+  /// The Prepare()/Open()/Close() cycle is controlled by the containing descriptor table
+  /// because the same partition descriptor may be used by multiple exec nodes with
+  /// different lifetimes.
+  /// TODO: Move these into the new query-wide state, indexed by partition id.
   std::vector<ExprContext*> partition_key_value_ctxs_;
 
   /// The format (e.g. text, sequence file etc.) of data in the files in this partition
@@ -435,6 +434,12 @@ class DescriptorTbl {
   static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
                        DescriptorTbl** tbl);
 
+  /// Prepares and opens partition exprs of Hdfs tables.
+  Status PrepareAndOpenPartitionExprs(RuntimeState* state) const;
+
+  /// Closes partition exprs of Hdfs tables.
+  void ClosePartitionExprs(RuntimeState* state) const;
+
   TableDescriptor* GetTableDescriptor(TableId id) const;
   TupleDescriptor* GetTupleDescriptor(TupleId id) const;
   SlotDescriptor* GetSlotDescriptor(SlotId id) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 76d2c02..1e52d08 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -326,6 +326,9 @@ void PlanFragmentExecutor::PrintVolumeIds(
 Status PlanFragmentExecutor::Open() {
   VLOG_QUERY << "Open(): instance_id="
       << runtime_state_->fragment_instance_id();
+
+  RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
+
   // we need to start the profile-reporting thread before calling Open(), since it
   // may block
   if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
@@ -589,6 +592,7 @@ void PlanFragmentExecutor::Close() {
       runtime_state_->io_mgr()->UnregisterContext(context);
     }
     exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
+    runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
     runtime_state_->filter_bank()->Close();
   }
   if (mem_usage_sampled_counter_ != NULL) {