You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/10 04:43:34 UTC

[1/3] incubator-impala git commit: Remove superfluous Hive Metastore integration test.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 37a2ba255 -> 218019e59


Remove superfluous Hive Metastore integration test.

Removes the test test_impala_sees_hive_tables_and_dbs
in test_metadata_query_statements.py

The same scenarios are covered in test_hms_integration.py

Change-Id: I5706c375a490ee9e8ea095458ca208a116687277
Reviewed-on: http://gerrit.cloudera.org:8080/4334
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d153d181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d153d181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d153d181

Branch: refs/heads/master
Commit: d153d181661e47cb9deffabaf356b7fcc2e77cb5
Parents: 37a2ba2
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Sep 7 22:47:23 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Sep 9 23:07:35 2016 +0000

----------------------------------------------------------------------
 .../metadata/test_metadata_query_statements.py  | 131 -------------------
 1 file changed, 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d153d181/tests/metadata/test_metadata_query_statements.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_metadata_query_statements.py b/tests/metadata/test_metadata_query_statements.py
index 12aa2be..7b2f72f 100644
--- a/tests/metadata/test_metadata_query_statements.py
+++ b/tests/metadata/test_metadata_query_statements.py
@@ -151,134 +151,3 @@ class TestMetadataQueryStatements(ImpalaTestSuite):
     self.cleanup_db('impala_test_desc_db2')
     self.cleanup_db('impala_test_desc_db3')
     self.cleanup_db('impala_test_desc_db4')
-
-  # Missing Coverage: ddl by hive being visible to Impala for data not residing in hdfs.
-  @SkipIfIsilon.hive
-  @SkipIfS3.hive
-  @SkipIfLocal.hive
-  @pytest.mark.execute_serially # because of invalidate metadata
-  def test_impala_sees_hive_tables_and_dbs(self, vector):
-    self.client.set_configuration(vector.get_value('exec_option'))
-    DB_NAME = 'hive_test_db'
-    TBL_NAME = 'testtbl'
-    self.cleanup_db(DB_NAME)
-    try:
-      self.__run_test_impala_sees_hive_tables_and_dbs(DB_NAME, TBL_NAME)
-    finally:
-      self.cleanup_db(DB_NAME)
-
-  def __run_test_impala_sees_hive_tables_and_dbs(self, db_name, tbl_name):
-    assert db_name not in self.all_db_names()
-
-    self.run_stmt_in_hive("create database %s" % db_name)
-
-    # Run 'invalidate metadata <table name>' when the parent database does not exist.
-    try:
-      self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name))
-      assert 0, 'Expected to fail'
-    except ImpalaBeeswaxException as e:
-      assert "TableNotFoundException: Table not found: %s.%s"\
-          % (db_name, tbl_name) in str(e)
-
-    assert db_name not in self.all_db_names()
-
-    # Create a table external to Impala.
-    self.run_stmt_in_hive("create table %s.%s (i int)" % (db_name, tbl_name))
-
-    # Impala does not know about this database or table.
-    assert db_name not in self.all_db_names()
-
-    # Run 'invalidate metadata <table name>'. It should add the database and table
-    # in to Impala's catalog.
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name))
-    assert db_name in self.all_db_names()
-
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name in result.data
-    assert len(result.data) == 1
-
-    self.client.execute("create table %s.%s (j int)" % (db_name, tbl_name + "_test"))
-    self.run_stmt_in_hive("drop table %s.%s" % (db_name, tbl_name + "_test"))
-
-    # Re-create the table in Hive. Use the same name, but different casing.
-    self.run_stmt_in_hive("create table %s.%s (i bigint)" % (db_name, tbl_name + "_TEST"))
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name + "_Test"))
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name + "_test" in result.data
-    assert tbl_name + "_Test" not in result.data
-    assert tbl_name + "_TEST" not in result.data
-
-    # Verify this table is the version created in Hive (the column should be BIGINT)
-    result = self.client.execute("describe %s.%s" % (db_name, tbl_name + '_test'))
-    assert 'bigint' in result.data[0]
-
-    self.client.execute("drop table %s.%s" % (db_name, tbl_name + "_TEST"))
-
-    # Make sure we can actually use the table
-    self.client.execute(("insert overwrite table %s.%s "
-                        "select 1 from functional.alltypes limit 5"
-                         % (db_name, tbl_name)))
-    result = self.execute_scalar("select count(*) from %s.%s" % (db_name, tbl_name))
-    assert int(result) == 5
-
-    # Should be able to call invalidate metadata multiple times on the same table.
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name))
-    self.client.execute("refresh %s.%s"  % (db_name, tbl_name))
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name in result.data
-
-    # Can still use the table.
-    result = self.execute_scalar("select count(*) from %s.%s" % (db_name, tbl_name))
-    assert int(result) == 5
-
-    # Run 'invalidate metadata <table name>' when no table exists with that name.
-    try:
-      self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name + '2'))
-      assert 0, 'Expected to fail'
-    except ImpalaBeeswaxException as e:
-      assert "TableNotFoundException: Table not found: %s.%s"\
-          % (db_name, tbl_name + '2') in str(e)
-
-    result = self.client.execute("show tables in %s" % db_name);
-    assert len(result.data) == 1
-    assert tbl_name in result.data
-
-    # Create another table
-    self.run_stmt_in_hive("create table %s.%s (i int)" % (db_name, tbl_name + '2'))
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name + '2'))
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name + '2' in result.data
-    assert tbl_name in result.data
-
-    # Drop the table, and then verify invalidate metadata <table name> removes the
-    # table from the catalog.
-    self.run_stmt_in_hive("drop table %s.%s " % (db_name, tbl_name))
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name))
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name + '2' in result.data
-    assert tbl_name not in result.data
-
-    # Should be able to call invalidate multiple times on the same table when the table
-    # does not exist.
-    try:
-      self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name))
-      assert 0, 'Expected to fail'
-    except ImpalaBeeswaxException as e:
-      assert "TableNotFoundException: Table not found: %s.%s"\
-          % (db_name, tbl_name) in str(e)
-
-    result = self.client.execute("show tables in %s" % db_name)
-    assert tbl_name + '2' in result.data
-    assert tbl_name not in result.data
-
-    # Drop the parent database (this will drop all tables). Then invalidate the table
-    self.run_stmt_in_hive("drop database %s CASCADE" % db_name)
-    self.client.execute("invalidate metadata %s.%s"  % (db_name, tbl_name + '2'))
-    result = self.client.execute("show tables in %s" % db_name);
-    assert len(result.data) == 0
-
-    # Requires a refresh to see the dropped database
-    assert db_name in self.all_db_names()
-
-    self.client.execute("invalidate metadata")
-    assert db_name not in self.all_db_names()


[3/3] incubator-impala git commit: IMPALA-4098: Open()/Close() partition exprs once per fragment instance.

Posted by ab...@apache.org.
IMPALA-4098: Open()/Close() partition exprs once per fragment instance.

Partition exprs stored in the descriptor table can be referenced by multiple
exec nodes (and/or a data sink) within the same fragment instance, so the
lifecycle of those exprs (Prepare/Open/Close) is tied to the fragment instance
and not to a particular exec node.

A recent change exposed this improper lifecycle management because we cloned
the partition exprs before using them, but by that time the exprs had been
closed which caused the cloning function to hit a DCHECK.

The fix is to tie the lifecycle of those exprs to that of the fragment
instance.

Testing: I could reliably reproduce the bug by running this query in a loop:

set num_nodes=1;
select count(a.year), count(a.month), count(a.int_col),
       count(b.year), count(b.month), count(b.int_col)
from functional.alltypessmall a, functional.alltypessmall b;

After this patch I was not able to reproduce the bug anymore. I don't think
it makes sense to add a test specifically for this bug because our existing
tests already caught it, and the hit DCHECK does not exist anymore due to
restructuring.

Change-Id: Id179df645e500530f4418988f6ce64a03d669892
Reviewed-on: http://gerrit.cloudera.org:8080/4340
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/218019e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/218019e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/218019e5

Branch: refs/heads/master
Commit: 218019e59fc6740e1564c91516b0dee46c64ed83
Parents: 7b4a6fa
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Sep 8 17:02:06 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Sep 10 01:28:00 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc       | 39 ++---------------
 be/src/exec/hdfs-scanner.cc              |  8 +---
 be/src/exec/hdfs-table-sink.cc           | 19 ---------
 be/src/runtime/descriptors.cc            | 60 +++++++++++++--------------
 be/src/runtime/descriptors.h             | 35 +++++++++-------
 be/src/runtime/plan-fragment-executor.cc |  4 ++
 6 files changed, 59 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index dba67b5..de1dad0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -283,16 +283,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
             try_cache, expected_local, file_desc->mtime));
   }
 
-  // Prepare all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    // This is IMPALA-1702, but will have been caught earlier in this method.
-    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
-                                   << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
-  }
-
   // Update server wide metrics for number of scan ranges and ranges that have
   // incomplete metadata.
   ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
@@ -373,19 +363,14 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
 
   for (FilterContext& filter: filter_ctxs_) RETURN_IF_ERROR(filter.expr->Open(state));
 
-  // Open all the partition exprs used by the scan node and create template tuples.
+  // Create template tuples for all partitions.
   for (int64_t partition_id: partition_ids_) {
     HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
     DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
                                    << " partition_id=" << partition_id
                                    << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->OpenExprs(state));
-    vector<ExprContext*> partition_key_value_ctxs;
-    RETURN_IF_ERROR(Expr::CloneIfNotExists(
-        partition_desc->partition_key_value_ctxs(), state, &partition_key_value_ctxs));
-    partition_template_tuple_map_[partition_id] =
-        InitTemplateTuple(partition_key_value_ctxs, scan_node_pool_.get(), state);
-    Expr::Close(partition_key_value_ctxs, state);
+    partition_template_tuple_map_[partition_id] = InitTemplateTuple(
+        partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state);
   }
 
   RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext(
@@ -476,20 +461,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
 
   if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
 
-  // Close all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    if (partition_desc == NULL) {
-      // TODO: Revert when IMPALA-1702 is fixed.
-      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << partition_id
-                 << "\n" << PrintThrift(state->fragment_params());
-      continue;
-    }
-    partition_desc->CloseExprs(state);
-  }
-
-  // Open collection conjuncts
+  // Close collection conjuncts
   for (const auto& tid_conjunct: conjuncts_map_) {
     // conjuncts_ are already closed in ExecNode::Close()
     if (tid_conjunct.first == tuple_id_) continue;
@@ -680,9 +652,6 @@ Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ExprContext*>& value_ctx
   for (int i = 0; i < partition_key_slots_.size(); ++i) {
     const SlotDescriptor* slot_desc = partition_key_slots_[i];
     ExprContext* value_ctx = value_ctxs[slot_desc->col_pos()];
-    /// This function may be called from multiple threads, and we expect each
-    /// thread to pass in their own cloned value contexts.
-    DCHECK(value_ctx->is_clone());
     // Exprs guaranteed to be literals, so can safely be evaluated without a row.
     RawValue::Write(value_ctx->GetValue(NULL), template_tuple, slot_desc, NULL);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 841d1e4..28eb606 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -118,14 +118,10 @@ Status HdfsScanner::Open(ScannerContext* context) {
   scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
 
   // Initialize the template_tuple_.
-  vector<ExprContext*> partition_key_value_ctxs;
-  RETURN_IF_ERROR(Expr::CloneIfNotExists(
-      context_->partition_descriptor()->partition_key_value_ctxs(), state_,
-      &partition_key_value_ctxs));
-  template_tuple_ = scan_node_->InitTemplateTuple(partition_key_value_ctxs,
+  template_tuple_ = scan_node_->InitTemplateTuple(
+      context_->partition_descriptor()->partition_key_value_ctxs(),
       template_tuple_pool_.get(), state_);
   template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
-  Expr::Close(partition_key_value_ctxs, state_);
 
   decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 3211c1c..77316d4 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -101,13 +101,6 @@ Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
   DCHECK_GE(output_expr_ctxs_.size(),
       table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString();
 
-  // Prepare literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    RETURN_IF_ERROR(partition->PrepareExprs(state));
-  }
-
   return Status::OK();
 }
 
@@ -161,12 +154,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 Status HdfsTableSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(partition_key_expr_ctxs_, state));
-  // Open literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    RETURN_IF_ERROR(partition->OpenExprs(state));
-  }
 
   // Get file format for default partition in table descriptor, and build a map from
   // partition key values to partition descriptor for multiple output format support. The
@@ -667,12 +654,6 @@ void HdfsTableSink::Close(RuntimeState* state) {
   }
   partition_keys_to_output_partitions_.clear();
 
-  // Close literal partition key exprs
-  for (const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc:
-       table_desc_->partition_descriptors()) {
-    HdfsPartitionDescriptor* partition = id_to_desc.second;
-    partition->CloseExprs(state);
-  }
   Expr::Close(output_expr_ctxs_, state);
   Expr::Close(partition_key_expr_ctxs_, state);
   DataSink::Close(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 548370a..47ca791 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -145,6 +145,7 @@ TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc)
   : name_(tdesc.tableName),
     database_(tdesc.dbName),
     id_(tdesc.id),
+    type_(tdesc.tableType),
     num_clustering_cols_(tdesc.numClusteringCols) {
   for (int i = 0; i < tdesc.columnDescriptors.size(); ++i) {
     col_descs_.push_back(ColumnDescriptor(tdesc.columnDescriptors[i]));
@@ -176,9 +177,6 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& thrift_table,
     escape_char_(thrift_partition.escapeChar),
     block_size_(thrift_partition.blockSize),
     id_(thrift_partition.id),
-    exprs_prepared_(false),
-    exprs_opened_(false),
-    exprs_closed_(false),
     file_format_(thrift_partition.fileFormat),
     object_pool_(pool) {
   DecompressLocation(thrift_table, thrift_partition, &location_);
@@ -192,29 +190,6 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& thrift_table,
   }
 }
 
-Status HdfsPartitionDescriptor::PrepareExprs(RuntimeState* state) {
-  if (!exprs_prepared_) {
-    // TODO: RowDescriptor should arguably be optional in Prepare for known literals
-    exprs_prepared_ = true;
-    // Partition exprs are not used in the codegen case.  Don't codegen them.
-    RETURN_IF_ERROR(Expr::Prepare(partition_key_value_ctxs_, state, RowDescriptor(),
-                                  state->instance_mem_tracker()));
-  }
-  return Status::OK();
-}
-
-Status HdfsPartitionDescriptor::OpenExprs(RuntimeState* state) {
-  if (exprs_opened_) return Status::OK();
-  exprs_opened_ = true;
-  return Expr::Open(partition_key_value_ctxs_, state);
-}
-
-void HdfsPartitionDescriptor::CloseExprs(RuntimeState* state) {
-  if (exprs_closed_ || !exprs_prepared_) return;
-  exprs_closed_ = true;
-  Expr::Close(partition_key_value_ctxs_, state);
-}
-
 string HdfsPartitionDescriptor::DebugString() const {
   stringstream out;
   out << " file_format=" << file_format_ << "'"
@@ -238,13 +213,11 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc,
     null_partition_key_value_(tdesc.hdfsTable.nullPartitionKeyValue),
     null_column_value_(tdesc.hdfsTable.nullColumnValue),
     object_pool_(pool) {
-  map<int64_t, THdfsPartition>::const_iterator it;
-  for (it = tdesc.hdfsTable.partitions.begin(); it != tdesc.hdfsTable.partitions.end();
-       ++it) {
+  for (const auto& entry : tdesc.hdfsTable.partitions) {
     HdfsPartitionDescriptor* partition =
-        new HdfsPartitionDescriptor(tdesc.hdfsTable, it->second, pool);
+        new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second, pool);
     object_pool_->Add(partition);
-    partition_descriptors_[it->first] = partition;
+    partition_descriptors_[entry.first] = partition;
   }
   avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : "";
 }
@@ -532,6 +505,31 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb
   return Status::OK();
 }
 
+Status DescriptorTbl::PrepareAndOpenPartitionExprs(RuntimeState* state) const {
+  for (const auto& tbl_entry : tbl_desc_map_) {
+    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
+    HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second);
+    for (const auto& part_entry : hdfs_tbl->partition_descriptors()) {
+      // TODO: RowDescriptor should arguably be optional in Prepare for known literals
+      // Partition exprs are not used in the codegen case.  Don't codegen them.
+      RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), state,
+          RowDescriptor(), state->instance_mem_tracker()));
+      RETURN_IF_ERROR(Expr::Open(part_entry.second->partition_key_value_ctxs(), state));
+    }
+  }
+  return Status::OK();
+}
+
+void DescriptorTbl::ClosePartitionExprs(RuntimeState* state) const {
+  for (const auto& tbl_entry: tbl_desc_map_) {
+    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
+    HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second);
+    for (const auto& part_entry: hdfs_tbl->partition_descriptors()) {
+      Expr::Close(part_entry.second->partition_key_value_ctxs(), state);
+    }
+  }
+}
+
 TableDescriptor* DescriptorTbl::GetTableDescriptor(TableId id) const {
   // TODO: is there some boost function to do exactly this?
   TableDescriptorMap::const_iterator i = tbl_desc_map_.find(id);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 10c2ff3..874ad69 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -204,6 +204,7 @@ class TableDescriptor {
   const std::string& name() const { return name_; }
   const std::string& database() const { return database_; }
   int id() const { return id_; }
+  TTableType::type type() const { return type_; }
   const std::vector<ColumnDescriptor>& col_descs() const { return col_descs_; }
 
   /// Returns "<database>.<name>"
@@ -213,6 +214,7 @@ class TableDescriptor {
   std::string name_;
   std::string database_;
   TableId id_;
+  TTableType::type type_;
   int num_clustering_cols_;
   std::vector<ColumnDescriptor> col_descs_;
 };
@@ -222,26 +224,24 @@ class HdfsPartitionDescriptor {
  public:
   HdfsPartitionDescriptor(const THdfsTable& thrift_table,
       const THdfsPartition& thrift_partition, ObjectPool* pool);
+
   char line_delim() const { return line_delim_; }
   char field_delim() const { return field_delim_; }
   char collection_delim() const { return collection_delim_; }
   char escape_char() const { return escape_char_; }
   THdfsFileFormat::type file_format() const { return file_format_; }
-  const std::vector<ExprContext*>& partition_key_value_ctxs() const {
-    return partition_key_value_ctxs_;
-  }
   int block_size() const { return block_size_; }
   const std::string& location() const { return location_; }
   int64_t id() const { return id_; }
-
-  /// Calls Prepare()/Open()/Close() on all partition key exprs. Idempotent (this is
-  /// because both HdfsScanNode and HdfsTableSink may both use the same partition desc).
-  Status PrepareExprs(RuntimeState* state);
-  Status OpenExprs(RuntimeState* state);
-  void CloseExprs(RuntimeState* state);
-
   std::string DebugString() const;
 
+  /// It is safe to evaluate the returned expr contexts concurrently from multiple
+  /// threads because all exprs are literals, after the descriptor table has been
+  /// opened.
+  const std::vector<ExprContext*>& partition_key_value_ctxs() const {
+    return partition_key_value_ctxs_;
+  }
+
  private:
   char line_delim_;
   char field_delim_;
@@ -253,13 +253,12 @@ class HdfsPartitionDescriptor {
   std::string location_;
   int64_t id_;
 
-  /// True if PrepareExprs has been called, to prevent repeating expensive codegen
-  bool exprs_prepared_;
-  bool exprs_opened_;
-  bool exprs_closed_;
-
   /// List of literal (and therefore constant) expressions for each partition key. Their
   /// order corresponds to the first num_clustering_cols of the parent table.
+  /// The Prepare()/Open()/Close() cycle is controlled by the containing descriptor table
+  /// because the same partition descriptor may be used by multiple exec nodes with
+  /// different lifetimes.
+  /// TODO: Move these into the new query-wide state, indexed by partition id.
   std::vector<ExprContext*> partition_key_value_ctxs_;
 
   /// The format (e.g. text, sequence file etc.) of data in the files in this partition
@@ -435,6 +434,12 @@ class DescriptorTbl {
   static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
                        DescriptorTbl** tbl);
 
+  /// Prepares and opens partition exprs of Hdfs tables.
+  Status PrepareAndOpenPartitionExprs(RuntimeState* state) const;
+
+  /// Closes partition exprs of Hdfs tables.
+  void ClosePartitionExprs(RuntimeState* state) const;
+
   TableDescriptor* GetTableDescriptor(TableId id) const;
   TupleDescriptor* GetTupleDescriptor(TupleId id) const;
   SlotDescriptor* GetSlotDescriptor(SlotId id) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/218019e5/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 76d2c02..1e52d08 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -326,6 +326,9 @@ void PlanFragmentExecutor::PrintVolumeIds(
 Status PlanFragmentExecutor::Open() {
   VLOG_QUERY << "Open(): instance_id="
       << runtime_state_->fragment_instance_id();
+
+  RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
+
   // we need to start the profile-reporting thread before calling Open(), since it
   // may block
   if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
@@ -589,6 +592,7 @@ void PlanFragmentExecutor::Close() {
       runtime_state_->io_mgr()->UnregisterContext(context);
     }
     exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
+    runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
     runtime_state_->filter_bank()->Close();
   }
   if (mem_usage_sampled_counter_ != NULL) {


[2/3] incubator-impala git commit: IMPALA-4097: Crash in kudu-scan-node-test

Posted by ab...@apache.org.
IMPALA-4097: Crash in kudu-scan-node-test

The kudu-scan-node-test was calling GetNext() with a NULL
row batch, which isn't valid. This wasn't failing until a
recent code change, and only occasionally due to the timing
of scanner threads producing row batches. One batch is empty
and the other has 1 row. This test worked when the empty row
batch was added to the batch queue first (which usually
happened), but the test code couldn't handle the other
ordering properly. This fixes the test to be more careful
about what is being exercised.

Change-Id: I0e30964232ed137fce7a99bf7e9557293b5905c8
Reviewed-on: http://gerrit.cloudera.org:8080/4337
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7b4a6fac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7b4a6fac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7b4a6fac

Branch: refs/heads/master
Commit: 7b4a6fac1acf16fb84b38a005f69c228620ddb3f
Parents: d153d18
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Thu Sep 8 13:19:47 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Sep 10 00:50:31 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node-test.cc | 27 ++++++++++++++++++++-------
 be/src/exec/kudu-scan-node.cc      |  1 +
 2 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b4a6fac/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index 972ae80..8324628 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -312,19 +312,32 @@ TEST_F(KuduScanNodeTest, TestScanEmptyString) {
   vector<TScanRangeParams> params;
   CreateScanRangeParams(num_cols_to_materialize, &params);
   SetUpScanner(&scanner, params);
-  bool eos = false;
   RowBatch* batch = obj_pool_->Add(
       new RowBatch(scanner.row_desc(), DEFAULT_ROWS_PER_BATCH, mem_tracker_.get()));
-  for (int i = 0; i < 2 && batch->num_rows() == 0; ++i) {
-    // Allow for up to 2 empty row batches since there are scanners created for all
-    // tablets (1 split point), and only one row was inserted.
+  bool eos = false;
+  int total_num_rows = 0;
+
+  // Need to call GetNext() a total of 3 times to allow for:
+  // 1) the row batch containing the single row
+  // 2) an empty row batch (since there are scanners created for both tablets)
+  // 3) a final call which returns eos.
+  // The first two may occur in any order and are checked in the for loop below.
+  for (int i = 0; i < 2; ++i) {
     ASSERT_OK(scanner.GetNext(runtime_state_.get(), batch, &eos));
+    ASSERT_FALSE(eos);
+    if (batch->num_rows() > 0) {
+      total_num_rows += batch->num_rows();
+      ASSERT_EQ(PrintBatch(batch), "[(10 null )]\n");
+    }
+    batch->Reset();
   }
-  ASSERT_EQ(1, batch->num_rows());
+  ASSERT_EQ(1, total_num_rows);
 
-  ASSERT_OK(scanner.GetNext(runtime_state_.get(), NULL, &eos));
+  // One last call to find the batch queue empty and GetNext() returns eos.
+  ASSERT_OK(scanner.GetNext(runtime_state_.get(), batch, &eos));
   ASSERT_TRUE(eos);
-  ASSERT_EQ(PrintBatch(batch), "[(10 null )]\n");
+  ASSERT_EQ(0, batch->num_rows());
+
   scanner.Close(runtime_state_.get());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b4a6fac/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index b247c67..e171afa 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -148,6 +148,7 @@ Status KuduScanNode::Open(RuntimeState* state) {
 }
 
 Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  DCHECK(row_batch != NULL);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));