You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/03/27 23:02:08 UTC

[impala] 14/17: IMPALA-11751: Template tuple of Avro header should be transferred to ScanRangeSharedState

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.2
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f3f0293df4c67bea7fdc136469d6835729ddee66
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Nov 29 18:53:17 2022 +0800

    IMPALA-11751: Template tuple of Avro header should be transferred to ScanRangeSharedState
    
    Sequence container based file formats (SequenceFile, RCFile, Avro) have
    a file header in each file that describes the metadata of the file, e.g.
    codec, default values, etc. The header should be decoded before reading
    the file content. The initial scanners will read the header and then
    issue follow-up scan ranges for the file content. The decoded header
    will be referenced by follow-up scanners.
    
    Since IMPALA-9655, when MT_DOP > 1, the issued scan ranges could be
    scheduled to other scan node instances. So the header resource should
    live until all scan node instances close. Header objects are owned by
    the object pool of the RuntimeState, which meets the requirement.
    
    AvroFileHeader is special than other headers in that it references a
    template tuple which contains the partition values and default values
    for missing fields. The template tuple is initially owned by the header
    scanner, then transferred to the scan node before the scanner closes.
    However, when the scan node instance closes, the template tuple is
    freed. Scanners of other scan node instances might still depend on it.
    This could cause wrong results or crash the impalad.
    
    When partition columns are used in the query, or when the underlying
    avro files have missing fields and the table schema has default values
    for them, the AvroFileHeader will have a non-null template tuple, which
    could hit this bug when MT_DOP>1.
    
    This patch fixes the bug by transferring the template tuple to
    ScanRangeSharedState directly. The scan_node_pool of HdfsScanNodeBase is
    also removed since it's only used to hold the template tuple (and
    related buffers) of the avro header. Also no need to override
    TransferToScanNodePool in HdfsScanNode since the original purpose is to
    protect the pool by a lock, and now the method in ScanRangeSharedState
    already has a lock.
    
    Tests
     - Add missing test coverage for compute stats on avro tables. Note that
       MT_DOP=4 is set by default for compute stats.
     - Add the MT_DOP dimension for TestScannersAllTableFormats. Also add
       some queries that can reveal the bug in scanners.test. The ASAN build
       can easily crash by heap-use-after-free error without this fix.
     - Ran exhaustive tests.
    
    Backport Notes:
     - Trivial conflicts in hdfs-scan-node-base.h and hdfs-scan-node-base.cc
       due to missing iceberg_partition_filtering_pool_ and
       HasVirtualColumnInTemplateTuple().
    
    Change-Id: Iafa43fce7c2ffdc867004d11e5873327c3d8cb42
    Reviewed-on: http://gerrit.cloudera.org:8080/19289
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc               |   2 +-
 be/src/exec/grouping-aggregator.cc                 |   1 -
 be/src/exec/hdfs-avro-scanner.cc                   |   4 +-
 be/src/exec/hdfs-avro-scanner.h                    |   5 +-
 be/src/exec/hdfs-scan-node-base.cc                 |  12 +-
 be/src/exec/hdfs-scan-node-base.h                  |  11 +-
 be/src/exec/hdfs-scan-node.cc                      |   5 -
 be/src/exec/hdfs-scan-node.h                       |   3 -
 be/src/exec/streaming-aggregation-node.h           |   4 +-
 .../queries/QueryTest/compute-stats-avro.test      | 151 +++++++++++++++++++++
 .../queries/QueryTest/scanners.test                |  56 ++++++++
 tests/query_test/test_scanners.py                  |   3 +
 12 files changed, 231 insertions(+), 26 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index ae66d0e01..9bb94929e 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -169,7 +169,7 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
       return Status::OK();
     }
     // Header is parsed, set the metadata in the scan node and issue more ranges.
-    static_cast<HdfsScanNodeBase*>(scan_node_)->SetFileMetadata(
+    scan_node_->SetFileMetadata(
         context_->partition_descriptor()->id(), stream_->filename(), header_);
     const HdfsFileDesc* desc = scan_node_->GetFileDesc(
         context_->partition_descriptor()->id(), stream_->filename());
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 40471892e..75425d599 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -475,7 +475,6 @@ Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
 
 Status GroupingAggregator::AddBatchStreaming(
     RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* eos) {
-  RETURN_IF_ERROR(QueryMaintenance(state));
   SCOPED_TIMER(streaming_timer_);
   RETURN_IF_ERROR(QueryMaintenance(state));
   num_input_rows_ += child_batch->num_rows();
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 3cd20ba6e..850a88633 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -120,7 +120,7 @@ Status HdfsAvroScanner::ReadFileHeader() {
 
   // Transfer ownership so the memory remains valid for subsequent scanners that process
   // the data portions of the file.
-  scan_node_->TransferToScanNodePool(template_tuple_pool_.get());
+  scan_node_->TransferToSharedStatePool(template_tuple_pool_.get());
   return Status::OK();
 }
 
@@ -289,6 +289,8 @@ Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
 
 Status HdfsAvroScanner::WriteDefaultValue(
     SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) {
+  // avro_header could have null template_tuple here if no partition columns are
+  // materialized and no default values are set yet.
   if (avro_header_->template_tuple == nullptr) {
     if (template_tuple_ != nullptr) {
       avro_header_->template_tuple = template_tuple_;
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 01220eee0..ee5b96579 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -121,8 +121,9 @@ class HdfsAvroScanner : public BaseSequenceScanner {
     /// Set to nullptr if there are no materialized partition keys and no default values
     /// are necessary (i.e., all materialized fields are present in the file schema).
     /// This tuple is created by the scanner processing the initial scan range with
-    /// the header. The ownership of memory is transferred to the scan-node pool,
-    /// such that it remains live when subsequent scanners process data ranges.
+    /// the header. The ownership of memory is transferred to the template pool of
+    /// ScanRangeSharedState, such that it remains live when subsequent scanners process
+    /// data ranges.
     Tuple* template_tuple;
 
     /// True if this file can use the codegen'd version of DecodeAvroData() (i.e. its
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 019467732..be60f33b5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -465,7 +465,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   }
 
   // One-time initialization of state that is constant across scan ranges
-  scan_node_pool_.reset(new MemPool(mem_tracker()));
   runtime_profile()->AddInfoString("Table Name", hdfs_table_->fully_qualified_name());
 
   if (HasRowBatchQueue()) {
@@ -627,8 +626,6 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
   // There should be no active hdfs read threads.
   DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
 
-  if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
-
   // Close collection conjuncts
   for (auto& tid_conjunct_eval : conjunct_evals_map_) {
     // ExecNode::conjunct_evals_ are already closed in ExecNode::Close()
@@ -988,8 +985,8 @@ void HdfsScanPlanNode::ComputeSlotMaterializationOrder(
   }
 }
 
-void HdfsScanNodeBase::TransferToScanNodePool(MemPool* pool) {
-  scan_node_pool_->AcquireData(pool, false);
+void HdfsScanNodeBase::TransferToSharedStatePool(MemPool* pool) {
+  shared_state_->TransferToSharedStatePool(pool);
 }
 
 void HdfsScanNodeBase::UpdateHdfsSplitStats(
@@ -1193,6 +1190,11 @@ Tuple* ScanRangeSharedState::GetTemplateTupleForPartitionId(int64_t partition_id
   return partition_template_tuple_map_[partition_id];
 }
 
+void ScanRangeSharedState::TransferToSharedStatePool(MemPool* pool) {
+  unique_lock<mutex> l(metadata_lock_);
+  template_pool_->AcquireData(pool, false);
+}
+
 void ScanRangeSharedState::UpdateRemainingScanRangeSubmissions(int32_t delta) {
   int new_val = remaining_scan_range_submissions_.Add(delta);
   DCHECK_GE(new_val, 0);
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7f5643756..539eb073a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -191,6 +191,9 @@ class ScanRangeSharedState {
   /// cancellation. Must be called before adding or removing scan ranges to the queue.
   void AddCancellationHook(RuntimeState* state);
 
+  /// Transfers all memory from 'pool' to 'template_pool_'.
+  void TransferToSharedStatePool(MemPool* pool);
+
  private:
   friend class HdfsScanPlanNode;
 
@@ -602,8 +605,8 @@ class HdfsScanNodeBase : public ScanNode {
         && (IsZeroSlotTableScan() || optimize_count_star());
   }
 
-  /// Transfers all memory from 'pool' to 'scan_node_pool_'.
-  virtual void TransferToScanNodePool(MemPool* pool);
+  /// Transfers all memory from 'pool' to shared state of all scanners.
+  void TransferToSharedStatePool(MemPool* pool);
 
   /// map from volume id to <number of split, per volume split lengths>
   typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumeStats;
@@ -780,10 +783,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// taken where there are i concurrent hdfs read thread running. Created in Open().
   std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
 
-  /// Pool for allocating some amounts of memory that is shared between scanners.
-  /// e.g. partition key tuple and their string buffers
-  boost::scoped_ptr<MemPool> scan_node_pool_;
-
   /// Status of failed operations.  This is set in the ScannerThreads
   /// Returned in GetNext() if an error occurred.  An non-ok status triggers cleanup
   /// scanner threads.
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index d9d7985f2..055d1080a 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -197,11 +197,6 @@ void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
   HdfsScanNodeBase::RangeComplete(file_type, compression_type, skipped);
 }
 
-void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
-  unique_lock<timed_mutex> l(lock_);
-  HdfsScanNodeBase::TransferToScanNodePool(pool);
-}
-
 void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
   InitNullCollectionValues(row_batch.get());
   thread_state_.EnqueueBatch(move(row_batch));
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index def5d8b95..db6a8f663 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -104,9 +104,6 @@ class HdfsScanNode : public HdfsScanNodeBase {
       const std::vector<THdfsCompression::type>& compression_type, bool skipped = false)
       override;
 
-  /// Transfers all memory from 'pool' to 'scan_node_pool_'.
-  virtual void TransferToScanNodePool(MemPool* pool) override;
-
   virtual ExecutionModel getExecutionModel() const override {
     return NON_TASK_BASED_SYNC;
   }
diff --git a/be/src/exec/streaming-aggregation-node.h b/be/src/exec/streaming-aggregation-node.h
index 6af70b075..e1427719b 100644
--- a/be/src/exec/streaming-aggregation-node.h
+++ b/be/src/exec/streaming-aggregation-node.h
@@ -34,11 +34,11 @@ class RuntimeState;
 /// aggregate the rows into its hash table, but if there is not enough memory available or
 /// if the reduction from the aggregation is not very good, it will 'stream' the rows
 /// through and return them without aggregating them instead of spilling. After all of the
-/// input as been processed from child(0), subsequent calls to GetNext() will return any
+/// input has been processed from child(0), subsequent calls to GetNext() will return any
 /// rows that were aggregated in the Aggregator's hash table.
 ///
 /// Since the rows returned by GetNext() may be only partially aggregated if there are
-/// memory contraints, this is a preliminary aggregation step that functions as an
+/// memory constraints, this is a preliminary aggregation step that functions as an
 /// optimization and will always be followed in the plan by an AggregationNode that does
 /// the final aggregation.
 ///
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-avro.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-avro.test
index 2d6d19018..387fd9bed 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-avro.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-avro.test
@@ -53,6 +53,157 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES
 STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
 ====
 ---- QUERY
+# Non-empty Avro table with matching column definitions and Avro schema
+create external table avro_hive_alltypes_ext
+like functional_avro_snap.alltypes;
+alter table avro_hive_alltypes_ext
+set location '/test-warehouse/alltypes_avro_snap';
+alter table avro_hive_alltypes_ext recover partitions;
+compute stats avro_hive_alltypes_ext;
+---- RESULTS
+'Updated 24 partition(s) and 11 column(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+show table stats avro_hive_alltypes_ext
+---- LABELS
+YEAR, MONTH, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'2009','1',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','2',280,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','3',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','4',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','5',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','6',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','7',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','8',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','9',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','10',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','11',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','12',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','1',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','2',280,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','3',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','4',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','5',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','6',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','7',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','8',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','9',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','10',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','11',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','12',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'Total','',7300,24,regex:.*,'0B','','','',''
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+show column stats avro_hive_alltypes_ext
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES
+---- RESULTS
+'id','INT',7300,0,4,4,-1,-1
+'bool_col','BOOLEAN',2,0,1,1,3650,3650
+'tinyint_col','INT',10,0,4,4,-1,-1
+'smallint_col','INT',10,0,4,4,-1,-1
+'int_col','INT',10,0,4,4,-1,-1
+'bigint_col','BIGINT',10,0,8,8,-1,-1
+'float_col','FLOAT',10,0,4,4,-1,-1
+'double_col','DOUBLE',10,0,8,8,-1,-1
+'date_string_col','STRING',736,0,8,8,-1,-1
+'string_col','STRING',10,0,1,1,-1,-1
+'timestamp_col','STRING',7224,0,22,21.66438293457031,-1,-1
+'year','INT',2,0,4,4,-1,-1
+'month','INT',12,0,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# Non-empty Avro table with matching column definitions and Avro schema, but with
+# different partition schema. Note that we use INT for tinyint_col and smallint_col,
+# and STRING for timestamp_col. See HIVE_TO_AVRO_TYPE_MAP in
+# testdata/bin/generate-schema-statements.py
+create external table avro_hive_alltypes_str_part (
+  id int,
+  bool_col boolean,
+  tinyint_col int,
+  smallint_col int,
+  int_col int,
+  bigint_col bigint,
+  float_col float,
+  double_col double,
+  date_string_col string,
+  string_col string,
+  timestamp_col string
+) partitioned by (
+  year string,
+  month string
+)
+stored as avro
+location '/test-warehouse/alltypes_avro_snap';
+alter table avro_hive_alltypes_str_part recover partitions;
+compute stats avro_hive_alltypes_str_part;
+---- RESULTS
+'Updated 24 partition(s) and 11 column(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+show table stats avro_hive_alltypes_str_part
+---- LABELS
+YEAR, MONTH, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'2009','1',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','2',280,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','3',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','4',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','5',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','6',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','7',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','8',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','9',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','10',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','11',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2009','12',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','1',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','2',280,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','3',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','4',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','5',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','6',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','7',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','8',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','9',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','10',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','11',300,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'2010','12',310,1,regex:.*,'NOT CACHED','NOT CACHED','AVRO','false',regex:.*
+'Total','',7300,24,regex:.*,'0B','','','',''
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+show column stats avro_hive_alltypes_str_part
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES
+---- RESULTS
+'id','INT',7300,0,4,4,-1,-1
+'bool_col','BOOLEAN',2,0,1,1,3650,3650
+'tinyint_col','INT',10,0,4,4,-1,-1
+'smallint_col','INT',10,0,4,4,-1,-1
+'int_col','INT',10,0,4,4,-1,-1
+'bigint_col','BIGINT',10,0,8,8,-1,-1
+'float_col','FLOAT',10,0,4,4,-1,-1
+'double_col','DOUBLE',10,0,8,8,-1,-1
+'date_string_col','STRING',736,0,8,8,-1,-1
+'string_col','STRING',10,0,1,1,-1,-1
+'timestamp_col','STRING',7224,0,22,21.66438293457031,-1,-1
+'year','STRING',2,0,-1,-1,-1,-1
+'month','STRING',12,0,-1,-1,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
 # Avro table with an extra column definition.
 compute stats avro_hive_alltypes_extra_coldef
 ---- RESULTS
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index 002d5d0d9..8dd741f67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -242,3 +242,59 @@ aggregation(SUM, RowsRead): 100
 aggregation(SUM, RowsRead): 0
 aggregation(SUM, RowsReturned): 200
 ====
+---- QUERY
+select year, count(*) from alltypes group by year
+---- RESULTS
+2009,3650
+2010,3650
+---- TYPES
+INT, BIGINT
+====
+---- QUERY
+select month, count(*) from alltypes group by month
+---- RESULTS
+1,620
+2,560
+3,620
+4,600
+5,620
+6,600
+7,620
+8,620
+9,600
+10,620
+11,600
+12,620
+---- TYPES
+INT, BIGINT
+====
+---- QUERY
+select year, month, count(*) from alltypes group by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,280
+2010,3,310
+2010,4,300
+2010,5,310
+2010,6,300
+2010,7,310
+2010,8,310
+2010,9,300
+2010,10,310
+2010,11,300
+2010,12,310
+---- TYPES
+INT, INT, BIGINT
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index ca15d43a2..6aea011a5 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -91,12 +91,14 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
         ImpalaTestDimension('batch_size', *TestScannersAllTableFormats.BATCH_SIZES))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('debug_action', *DEBUG_ACTION_DIMS))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
 
   def test_scanners(self, vector):
     new_vector = deepcopy(vector)
     # Copy over test dimensions to the matching query options.
     new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
     new_vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
+    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/scanners', new_vector)
 
   def test_many_nulls(self, vector):
@@ -107,6 +109,7 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
     new_vector = deepcopy(vector)
     new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
     new_vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
+    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/scanners-many-nulls', new_vector)
 
   def test_hdfs_scanner_profile(self, vector):