You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/04/05 13:44:23 UTC

[impala] branch master updated: IMPALA-11123: Optimize count(star) for ORC scans

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f932d78ad IMPALA-11123: Optimize count(star) for ORC scans
f932d78ad is described below

commit f932d78ad0a30e322d59fc39072f710f889d2135
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Mar 14 18:07:05 2022 -0700

    IMPALA-11123: Optimize count(star) for ORC scans
    
    This patch provides count(star) optimization for ORC scans, similar to
    the work done in IMPALA-5036 for Parquet scans. We use the stripes num
    rows statistics when computing the count star instead of materializing
    empty rows. The aggregate function changed from a count to a special sum
    function initialized to 0.
    
    This count(star) optimization is disabled for the full ACID table
    because the scanner might need to read and validate the
    'currentTransaction' column in table's special schema.
    
    This patch drops 'parquet' from names related to the count star
    optimization. It also improves the count(star) operation in general by
    serving the result just from the file's footer stats for both Parquet
    and ORC. We unify the optimized count star and zero slot scan functions
    into HdfsColumnarScanner.
    
    The following table shows a performance comparison before and after the
    patch. primitive_count_star query target tpch10_parquet.lineitem
    table (10GB scale TPC-H). Meanwhile, count_star_parq and count_star_orc
    query is a modified primitive_count_star query that targets
    tpch_parquet.lineitem and tpch_orc_def.lineitem table accordingly.
    
    +-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    | Workload          | Query                | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
    +-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    | tpch_parquet      | count_star_parq      | parquet / none / none | 0.06   | 0.07        |   -10.45%  |   2.87%    | * 25.51% *     | 9     |   -1.47%       | -1.26   | -1.22 |
    | tpch_orc_def      | count_star_orc       | orc / def / none      | 0.06   | 0.08        |   -22.37%  |   6.22%    | * 30.95% *     | 9     |   -1.85%       | -1.16   | -2.14 |
    | TARGETED-PERF(10) | primitive_count_star | parquet / none / none | 0.06   | 0.08        | I -30.40%  |   2.68%    | * 29.63% *     | 9     | I -7.20%       | -2.42   | -3.07 |
    +-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    
    Testing:
    - Add PlannerTest.testOrcStatsAgg
    - Add TestAggregationQueries::test_orc_count_star_optimization
    - Exercise count(star) in TestOrc::test_misaligned_orc_stripes
    - Pass core tests
    
    Change-Id: I0fafa1182f97323aeb9ee39dd4e8ecd418fa6091
    Reviewed-on: http://gerrit.cloudera.org:8080/18327
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-columnar-scanner.cc               |  72 +++-
 be/src/exec/hdfs-columnar-scanner.h                |  37 ++
 be/src/exec/hdfs-orc-scanner.cc                    |  64 ++--
 be/src/exec/hdfs-orc-scanner.h                     |  21 +-
 be/src/exec/hdfs-scan-node-base.cc                 |   5 +-
 be/src/exec/hdfs-scan-node-base.h                  |  41 +-
 be/src/exec/hdfs-scanner.cc                        |   5 +-
 be/src/exec/orc-column-readers.h                   |   4 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 101 ++---
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  32 +-
 be/src/exec/parquet/parquet-column-readers.h       |   2 +-
 common/thrift/PlanNodes.thrift                     |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  20 +-
 .../org/apache/impala/planner/PlannerTest.java     |   4 +
 .../queries/PlannerTest/orc-stats-agg.test         | 426 +++++++++++++++++++++
 .../queries/PlannerTest/parquet-stats-agg.test     |  31 +-
 .../queries/PlannerTest/resource-requirements.test |   9 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   3 +-
 .../iceberg-upper-lower-bound-metrics.test         |   9 +-
 .../queries/QueryTest/mixed-format.test            |  13 +-
 .../queries/QueryTest/orc-stats-agg.test           | 152 ++++++++
 .../queries/QueryTest/parquet-stats-agg.test       |  60 +++
 .../queries/QueryTest/partition-key-scans.test     |  28 ++
 .../queries/QueryTest/runtime_filters.test         |   8 +
 .../queries/QueryTest/runtime_filters_mt_dop.test  |   8 +
 .../queries/QueryTest/scanners.test                |   3 +
 tests/custom_cluster/test_executor_groups.py       |   2 +-
 tests/custom_cluster/test_query_retries.py         |  33 +-
 tests/query_test/test_aggregation.py               |  72 ++--
 tests/query_test/test_scanners.py                  |  12 +-
 tests/util/test_file_parser.py                     |  20 +-
 31 files changed, 1046 insertions(+), 256 deletions(-)

diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc
index 913142936..28da3c40a 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -64,20 +64,27 @@ PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, TUnit::BYTES,
     "The total number of bytes read from streams.");
 PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES,
     "The total number of bytes skipped from streams.");
+PROFILE_DEFINE_COUNTER(NumFileMetadataRead, DEBUG, TUnit::UNIT,
+    "The total number of file metadata reads done in place of rows or row groups / "
+    "stripe iteration.");
 
 const char* HdfsColumnarScanner::LLVM_CLASS_NAME = "class.impala::HdfsColumnarScanner";
 
-HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
-    RuntimeState* state) :
-    HdfsScanner(scan_node, state),
+HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+  : HdfsScanner(scan_node, state),
     scratch_batch_(new ScratchTupleBatch(
-        *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())) {
-}
+        *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
+    assemble_rows_timer_(scan_node->materialize_tuple_timer()) {}
 
 HdfsColumnarScanner::~HdfsColumnarScanner() {}
 
 Status HdfsColumnarScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
+  // Memorize 'is_footer_scanner_' here since 'stream_' can be released early.
+  const io::ScanRange* range = stream_->scan_range();
+  is_footer_scanner_ =
+      range->offset() + range->bytes_to_read() >= stream_->file_desc()->file_length;
+
   RuntimeProfile* profile = scan_node_->runtime_profile();
   num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile);
   num_scanners_with_no_reads_counter_ =
@@ -95,6 +102,7 @@ Status HdfsColumnarScanner::Open(ScannerContext* context) {
   io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile);
   io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile);
   io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile);
+  num_file_metadata_read_ = PROFILE_NumFileMetadataRead.Instantiate(profile);
   return Status::OK();
 }
 
@@ -290,6 +298,60 @@ Status HdfsColumnarScanner::DivideReservationBetweenColumns(
   return Status::OK();
 }
 
+Status HdfsColumnarScanner::GetNextWithCountStarOptimization(RowBatch* row_batch) {
+  // There are no materialized slots, e.g. count(*) over the table.  We can serve
+  // this query from just the file metadata.  We don't need to read the column data.
+  // Only scanner of the footer split will run in this case. See the logic in
+  // HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly().
+  DCHECK(is_footer_scanner_);
+  int64_t tuple_buffer_size;
+  uint8_t* tuple_buffer;
+  int capacity = 1;
+  RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_,
+      row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity,
+      &tuple_buffer_size, &tuple_buffer));
+  int64_t num_rows = GetNumberOfRowsInFile();
+  COUNTER_ADD(num_file_metadata_read_, 1);
+  DCHECK_LE(rows_read_in_group_, num_rows);
+  Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+  InitTuple(template_tuple_, dst_tuple);
+  int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
+  *dst_slot = num_rows;
+  TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+  dst_row->SetTuple(0, dst_tuple);
+  row_batch->CommitLastRow();
+  rows_read_in_group_ += num_rows;
+  eos_ = true;
+  return Status::OK();
+}
+
+Status HdfsColumnarScanner::GetNextWithTemplateTuple(RowBatch* row_batch) {
+  // There are no materialized slots, e.g. "select 1" over the table.  We can serve
+  // this query from just the file metadata.  We don't need to read the column data.
+  // Only scanner of the footer split will run in this case. See the logic in
+  // HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly().
+  // We might also get here for count(*) query against full acid table such as:
+  // "select count(*) from functional_orc_def.alltypes;"
+  DCHECK(is_footer_scanner_);
+  int64_t file_rows = GetNumberOfRowsInFile();
+  COUNTER_ADD(num_file_metadata_read_, 1);
+  if (rows_read_in_group_ == file_rows) {
+    eos_ = true;
+    return Status::OK();
+  }
+  assemble_rows_timer_.Start();
+  DCHECK_LT(rows_read_in_group_, file_rows);
+  int64_t rows_remaining = file_rows - rows_read_in_group_;
+  int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+  TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+  int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+  Status status = CommitRows(num_to_commit, row_batch);
+  assemble_rows_timer_.Stop();
+  RETURN_IF_ERROR(status);
+  rows_read_in_group_ += max_tuples;
+  return Status::OK();
+}
+
 void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) {
   io_sync_request_->Add(1);
   io_total_request_->Add(1);
diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h
index 472863bf2..af423cff8 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -60,6 +60,25 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// top-level tuples. See AssembleRows() in the derived classes.
   boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
 
+  /// Timer for materializing rows.  This ignores time getting the next buffer.
+  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
+
+  /// Index of the current row group / stripe being processed. Initialized to -1 which
+  /// indicates that we have not started processing the first group yet (GetNext() has
+  /// not yet been called).
+  int32_t group_idx_ = -1;
+
+  /// Counts the number of rows processed for the current row group / stripe.
+  int64_t rows_read_in_group_ = 0;
+
+  /// Indicates whether we should advance to the next row group / stripe in the next
+  /// GetNext(). Starts out as true to move to the very first row group.
+  bool advance_group_ = true;
+
+  /// Indicate whether this is a footer scanner or not.
+  /// Assigned in HdfsColumnarScanner::Open().
+  bool is_footer_scanner_ = false;
+
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_ = nullptr;
 
@@ -100,6 +119,9 @@ class HdfsColumnarScanner : public HdfsScanner {
   Status DivideReservationBetweenColumns(const ColumnRangeLengths& col_range_lengths,
       ColumnReservations& reservation_per_column);
 
+  /// Get the number of rows in file.
+  virtual int64_t GetNumberOfRowsInFile() const = 0;
+
   /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
   /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
   /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
@@ -114,6 +136,16 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// in ExecEnv.
   static int64_t ComputeIdealReservation(const ColumnRangeLengths& col_range_lengths);
 
+  /// Handle count(*) queries by reading the row count from the footer statistics.
+  /// The optimization is possible only in simpler cases e.g. when there are no conjucts.
+  /// Check ScanNode.java#canApplyCountStarOptimization for full detail.
+  Status GetNextWithCountStarOptimization(RowBatch* row_batch);
+
+  /// Handle zero slot scan queries by reading the row count from the footer statistics.
+  /// Possible queries include "select 1" or "select count(*)" over full acid table that
+  /// does not require row validation.
+  Status GetNextWithTemplateTuple(RowBatch* row_batch);
+
   /// Number of columns that need to be read.
   RuntimeProfile::Counter* num_cols_counter_;
 
@@ -142,6 +174,11 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// Total number of bytes skipped during stream reading.
   RuntimeProfile::Counter* io_skipped_bytes_;
 
+  /// Total file metadata reads done.
+  /// Incremented when serving query from metadata instead of iterating rows or
+  /// row groups / stripes.
+  RuntimeProfile::Counter* num_file_metadata_read_;
+
  private:
   int ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch);
 };
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index bc81fc4f2..da55f7782 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -307,8 +307,7 @@ HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsColumnarScanner(scan_node, state),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     data_batch_pool_(new MemPool(scan_node->mem_tracker())),
-    search_args_pool_(new MemPool(scan_node->mem_tracker())),
-    assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
+    search_args_pool_(new MemPool(scan_node->mem_tracker())) {
   assemble_rows_timer_.Stop();
 }
 
@@ -401,9 +400,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
     row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
   }
 
-  if (UNLIKELY(scan_node_->optimize_parquet_count_star())) {
-    DCHECK(false);
-    return Status("Internal ERROR: ORC scanner cannot optimize count star slot.");
+  if (scan_node_->optimize_count_star() && !row_batches_need_validation_) {
+    template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+    return Status::OK();
   }
 
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
@@ -766,29 +765,20 @@ Status HdfsOrcScanner::ProcessSplit() {
 }
 
 Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
-  // In case 'row_batches_need_validation_' is true, we need to look at the row
-  // batches and check their validity. In that case 'currentTransaction' is the only
-  // selected field from the file (in case of zero slot scans).
-  if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) {
-    uint64_t file_rows = reader_->getNumberOfRows();
-    // There are no materialized slots, e.g. count(*) over the table.  We can serve
+  if (row_batches_need_validation_) {
+    // In case 'row_batches_need_validation_' is true, we need to look at the row
+    // batches and check their validity. This might be a zero slot scan, which
+    // 'currentTransaction' is the only selected field from the file. And this should
+    // not be an optimized count(*) because it is disabled for full acid table.
+    DCHECK(!scan_node_->optimize_count_star());
+  } else if (scan_node_->optimize_count_star()) {
+    // This is an optimized count(*) case.
+    // For each file, populate one slot with the footer's numberOfRows statistic.
+    return GetNextWithCountStarOptimization(row_batch);
+  } else if (scan_node_->IsZeroSlotTableScan()) {
+    // There are no materialized slots, e.g. "select 1" over the table.  We can serve
     // this query from just the file metadata.  We don't need to read the column data.
-    if (stripe_rows_read_ == file_rows) {
-      eos_ = true;
-      return Status::OK();
-    }
-    assemble_rows_timer_.Start();
-    DCHECK_LT(stripe_rows_read_, file_rows);
-    int64_t rows_remaining = file_rows - stripe_rows_read_;
-    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
-    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
-    Status status = CommitRows(num_to_commit, row_batch);
-    assemble_rows_timer_.Stop();
-    RETURN_IF_ERROR(status);
-    stripe_rows_read_ += max_tuples;
-    COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit);
-    return Status::OK();
+    return GetNextWithTemplateTuple(row_batch);
   }
 
   if (!scratch_batch_->AtEnd()) {
@@ -820,7 +810,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
   // 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe we advance
   // to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
   // set to true in 'AssembleRows'.
-  while (advance_stripe_ || end_of_stripe_) {
+  while (advance_group_ || end_of_stripe_) {
     // The next stripe will use a new dictionary blob so transfer the memory to row_batch.
     row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
     context_->ReleaseCompletedResources(/* done */ true);
@@ -828,8 +818,8 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
     RETURN_IF_ERROR(CommitRows(0, row_batch));
 
     RETURN_IF_ERROR(NextStripe());
-    DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes());
-    if (stripe_idx_ == reader_->getNumberOfStripes()) {
+    DCHECK_LE(group_idx_, reader_->getNumberOfStripes());
+    if (group_idx_ == reader_->getNumberOfStripes()) {
       eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
@@ -869,19 +859,19 @@ Status HdfsOrcScanner::NextStripe() {
   int64_t split_offset = split_range->offset();
   int64_t split_length = split_range->len();
 
-  bool start_with_first_stripe = stripe_idx_ == -1;
+  bool start_with_first_stripe = group_idx_ == -1;
   bool misaligned_stripe_skipped = false;
 
-  advance_stripe_ = false;
-  stripe_rows_read_ = 0;
+  advance_group_ = false;
+  rows_read_in_group_ = 0;
 
   // Loop until we have found a non-empty stripe.
   while (true) {
     // Reset the parse status for the next stripe.
     parse_status_ = Status::OK();
 
-    ++stripe_idx_;
-    if (stripe_idx_ >= reader_->getNumberOfStripes()) {
+    ++group_idx_;
+    if (group_idx_ >= reader_->getNumberOfStripes()) {
       if (start_with_first_stripe && misaligned_stripe_skipped) {
         // We started with the first stripe and skipped all the stripes because they were
         // misaligned. The execution flow won't reach this point if there is at least one
@@ -890,7 +880,7 @@ Status HdfsOrcScanner::NextStripe() {
       }
       break;
     }
-    unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(stripe_idx_);
+    unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(group_idx_);
     // Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 'select *'
     // behave consistently for corrupt files that have 'footer_.numberOfRows == 0'
     // but some data in stripe.
@@ -963,7 +953,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
     if (row_batch->AtCapacity()) break;
     continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
   }
-  stripe_rows_read_ += num_rows_read;
+  rows_read_in_group_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index f68002e96..bdd40a933 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -183,6 +183,11 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch) override;
 
+ protected:
+  virtual int64_t GetNumberOfRowsInFile() const override {
+    return static_cast<int64_t>(reader_->getNumberOfRows());
+  }
+
  private:
   friend class OrcColumnReader;
   friend class OrcDateColumnReader;
@@ -193,23 +198,10 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   friend class OrcStructReader;
   friend class OrcListReader;
   friend class OrcMapReader;
-  friend class HdfsOrcScannerTest;
 
   /// Memory guard of the tuple_mem_
   uint8_t* tuple_mem_end_ = nullptr;
 
-  /// Index of the current stripe being processed. Stripe in ORC is equivalent to
-  /// RowGroup in Parquet. Initialized to -1 which indicates that we have not started
-  /// processing the first stripe yet (GetNext() has not yet been called).
-  int32_t stripe_idx_ = -1;
-
-  /// Counts the number of rows processed for the current stripe.
-  int64_t stripe_rows_read_ = 0;
-
-  /// Indicates whether we should advance to the next stripe in the next GetNext().
-  /// Starts out as true to move to the very first stripe.
-  bool advance_stripe_ = true;
-
   /// Indicates whether we are at the end of a stripe.
   bool end_of_stripe_ = true;
 
@@ -292,9 +284,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// offset, and there are no two overlapping range.
   vector<ColumnRange> columnRanges_;
 
-  /// Timer for materializing rows. This ignores time getting the next buffer.
-  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
-
   /// Number of stripes that need to be read.
   RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 0e9c07d05..019467732 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -408,9 +408,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno
             hdfs_scan_node.skip_header_line_count :
             0),
     tuple_id_(pnode.tuple_id_),
-    parquet_count_star_slot_offset_(
-        hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
-            hdfs_scan_node.parquet_count_star_slot_offset :
+    count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ?
+            hdfs_scan_node.count_star_slot_offset :
             -1),
     is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan),
     tuple_desc_(pnode.tuple_desc_),
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e6cd7c6fd..7f5643756 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -30,6 +30,7 @@
 #include <boost/unordered_map.hpp>
 
 #include "codegen/codegen-fn-ptr.h"
+#include "exec/acid-metadata-utils.h"
 #include "exec/filter-context.h"
 #include "exec/scan-node.h"
 #include "runtime/descriptors.h"
@@ -437,10 +438,12 @@ class HdfsScanNodeBase : public ScanNode {
   const AvroSchemaElement& avro_schema() const { return avro_schema_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
   io::RequestContext* reader_context() const { return reader_context_.get(); }
-  bool optimize_parquet_count_star() const {
-    return parquet_count_star_slot_offset_ != -1;
+  bool optimize_count_star() const {
+    bool is_optimized = count_star_slot_offset_ != -1;
+    DCHECK(!hdfs_table_->IsTableFullAcid() || !is_optimized);
+    return is_optimized;
   }
-  int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
+  int count_star_slot_offset() const { return count_star_slot_offset_; }
   bool is_partition_key_scan() const { return is_partition_key_scan_; }
 
   typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
@@ -573,6 +576,32 @@ class HdfsScanNodeBase : public ScanNode {
     return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
   }
 
+  /// Return true if scan over 'filename' require row validation.
+  /// Hive Streaming Ingestion allocates multiple write ids, hence create delta
+  /// directories like delta_5_10. Then it continuously appends new stripes (and footers)
+  /// to the ORC files of the delte dir. So it's possible that the file has rows that
+  /// Impala is not allowed to see based on its valid write id list. In such cases we need
+  /// to validate the write ids of the row batches.
+  inline bool RequireRowValidation(std::string filename) const {
+    if (!hdfs_table()->IsTableFullAcid()) return false;
+    if (ValidWriteIdList::IsCompacted(filename)) return false;
+    ValidWriteIdList valid_write_ids;
+    std::pair<int64_t, int64_t> acid_write_id_range =
+        valid_write_ids.GetWriteIdRange(filename);
+    valid_write_ids.InitFrom(hdfs_table()->ValidWriteIdList());
+    ValidWriteIdList::RangeResponse rows_valid = valid_write_ids.IsWriteIdRangeValid(
+        acid_write_id_range.first, acid_write_id_range.second);
+    DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
+    return rows_valid == ValidWriteIdList::SOME;
+  }
+
+  /// Return true if scan over 'filename 'can be served only by reading the file metadata,
+  /// such as a count(*) over the table.
+  inline bool ReadsFileMetadataOnly(std::string filename) const {
+    return !RequireRowValidation(filename)
+        && (IsZeroSlotTableScan() || optimize_count_star());
+  }
+
   /// Transfers all memory from 'pool' to 'scan_node_pool_'.
   virtual void TransferToScanNodePool(MemPool* pool);
 
@@ -649,11 +678,11 @@ class HdfsScanNodeBase : public ScanNode {
   /// Tuple id of the tuple descriptor to be used.
   const int tuple_id_;
 
-  /// The byte offset of the slot for Parquet metadata if Parquet count star optimization
+  /// The byte offset of the slot for Parquet/ORC metadata if count star optimization
   /// is enabled. When set, this scan node can optimize a count(*) query by populating
   /// the tuple with data from the Parquet num rows statistic. See
-  /// applyParquetCountStartOptimization() in HdfsScanNode.java.
-  const int parquet_count_star_slot_offset_;
+  /// applyCountStarOptimization() in ScanNode.java.
+  const int count_star_slot_offset_;
 
   // True if this is a partition key scan that needs only to return at least one row from
   // each scan range. If true, the scan node and scanner implementations should attempt
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index f4ff16627..fad812d26 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -838,12 +838,13 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
       ScanRange* split = files[i]->splits[j];
 
       DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
-      // If there are no materialized slots (such as count(*) over the table), we can
+      // If scan only reads file metadata (such as count(*) over the table), we can
       // get the result with the file metadata alone and don't need to read any row
       // groups. We only want a single node to process the file footer in this case,
       // which is the node with the footer split.  If it's not a count(*), we create a
       // footer range for the split always.
-      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
+      if (!scan_node->ReadsFileMetadataOnly(files[i]->filename)
+          || footer_split == split) {
         ScanRangeMetadata* split_metadata =
             static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file footer, which
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 555607def..17195c99d 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -337,8 +337,8 @@ class OrcStringColumnReader : public OrcPrimitiveColumnReader<OrcStringColumnRea
     }
     DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) ==
         dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch));
-    if (last_stripe_idx_ != scanner_->stripe_idx_) {
-      last_stripe_idx_ = scanner_->stripe_idx_;
+    if (last_stripe_idx_ != scanner_->group_idx_) {
+      last_stripe_idx_ = scanner_->group_idx_;
       auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_);
       return InitBlob(&current_batch->dictionary->dictionaryBlob,
           scanner_->dictionary_pool_.get());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index fcc4c852d..f79e02b91 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -85,14 +85,10 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
 
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsColumnarScanner(scan_node, state),
-    row_group_idx_(-1),
-    row_group_rows_read_(0),
-    advance_row_group_(true),
     min_max_tuple_(nullptr),
     row_batches_produced_(0),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     stats_batch_read_pool_(new MemPool(scan_node->mem_tracker())),
-    assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
     num_stats_filtered_row_groups_counter_(nullptr),
     num_minmax_filtered_row_groups_counter_(nullptr),
     num_bloom_filtered_row_groups_counter_(nullptr),
@@ -382,7 +378,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
 
 int HdfsParquetScanner::CountScalarColumns(
     const vector<ParquetColumnReader*>& column_readers) {
-  DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
+  DCHECK(!column_readers.empty() || scan_node_->optimize_count_star());
   int num_columns = 0;
   stack<ParquetColumnReader*> readers;
   for (ParquetColumnReader* r: column_readers_) readers.push(r);
@@ -432,55 +428,15 @@ Status HdfsParquetScanner::ProcessSplit() {
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
-  if (scan_node_->optimize_parquet_count_star()) {
+  if (scan_node_->optimize_count_star()) {
+    // This is an optimized count(*) case.
     // Populate the single slot with the Parquet num rows statistic.
-    int64_t tuple_buf_size;
-    uint8_t* tuple_buf;
-    // We try to allocate a smaller row batch here because in most cases the number row
-    // groups in a file is much lower than the default row batch capacity.
-    int capacity = min(
-        static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity());
-    RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
-        row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
-        &capacity, &tuple_buf_size, &tuple_buf));
-    while (!row_batch->AtCapacity()) {
-      RETURN_IF_ERROR(NextRowGroup());
-      DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
-      DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
-      if (row_group_idx_ == file_metadata_.row_groups.size()) break;
-      Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
-      TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
-      InitTuple(template_tuple_, dst_tuple);
-      int64_t* dst_slot =
-          dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
-      *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
-      row_group_rows_read_ += *dst_slot;
-      dst_row->SetTuple(0, dst_tuple);
-      row_batch->CommitLastRow();
-      tuple_buf += scan_node_->tuple_desc()->byte_size();
-    }
-    eos_ = row_group_idx_ == file_metadata_.row_groups.size();
-    return Status::OK();
+    return GetNextWithCountStarOptimization(row_batch);
   } else if (scan_node_->IsZeroSlotTableScan()) {
     // There are no materialized slots and we are not optimizing count(*), e.g.
     // "select 1 from alltypes". We can serve this query from just the file metadata.
     // We don't need to read the column data.
-    if (row_group_rows_read_ == file_metadata_.num_rows) {
-      eos_ = true;
-      return Status::OK();
-    }
-    assemble_rows_timer_.Start();
-    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
-    int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
-    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
-    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
-    Status status = CommitRows(row_batch, num_to_commit);
-    assemble_rows_timer_.Stop();
-    RETURN_IF_ERROR(status);
-    row_group_rows_read_ += max_tuples;
-    COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
-    return Status::OK();
+    return GetNextWithTemplateTuple(row_batch);
   }
 
   // Transfer remaining tuples from the scratch batch.
@@ -492,18 +448,18 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     if (row_batch->AtCapacity()) return Status::OK();
   }
 
-  while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+  while (advance_group_ || column_readers_[0]->RowGroupAtEnd()) {
     // Transfer resources and clear streams if there is any leftover from the previous
     // row group. We will create new streams for the next row group.
     FlushRowGroupResources(row_batch);
-    if (!advance_row_group_) {
+    if (!advance_group_) {
       Status status =
-          ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
+          ValidateEndOfRowGroup(column_readers_, group_idx_, rows_read_in_group_);
       if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
     }
     RETURN_IF_ERROR(NextRowGroup());
-    DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
-    if (row_group_idx_ == file_metadata_.row_groups.size()) {
+    DCHECK_LE(group_idx_, file_metadata_.row_groups.size());
+    if (group_idx_ == file_metadata_.row_groups.size()) {
       eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
@@ -522,9 +478,9 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   assemble_rows_timer_.Start();
   Status status;
   if (filter_pages_) {
-    status = AssembleRows<true>(row_batch, &advance_row_group_);
+    status = AssembleRows<true>(row_batch, &advance_group_);
   } else {
-    status = AssembleRows<false>(row_batch, &advance_row_group_);
+    status = AssembleRows<false>(row_batch, &advance_group_);
   }
   assemble_rows_timer_.Stop();
   RETURN_IF_ERROR(status);
@@ -858,11 +814,11 @@ Status HdfsParquetScanner::NextRowGroup() {
   const HdfsFileDesc* file_desc =
       scan_node_->GetFileDesc(context_->partition_descriptor()->id(), filename());
 
-  bool start_with_first_row_group = row_group_idx_ == -1;
+  bool start_with_first_row_group = group_idx_ == -1;
   bool misaligned_row_group_skipped = false;
 
-  advance_row_group_ = false;
-  row_group_rows_read_ = 0;
+  advance_group_ = false;
+  rows_read_in_group_ = 0;
 
   // Loop until we have found a non-empty row group, and successfully initialized and
   // seeded the column readers. Return a non-OK status from within loop only if the error
@@ -874,8 +830,8 @@ Status HdfsParquetScanner::NextRowGroup() {
     // or previous row groups.
     DCHECK_EQ(0, context_->NumStreams());
 
-    ++row_group_idx_;
-    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+    ++group_idx_;
+    if (group_idx_ >= file_metadata_.row_groups.size()) {
       if (start_with_first_row_group && misaligned_row_group_skipped) {
         // We started with the first row group and skipped all the row groups because
         // they were misaligned. The execution flow won't reach this point if there is at
@@ -884,7 +840,7 @@ Status HdfsParquetScanner::NextRowGroup() {
       }
       break;
     }
-    const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+    const parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
     // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
     // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
     // but some data in row groups.
@@ -1311,7 +1267,7 @@ Status HdfsParquetScanner::ProcessPageIndex() {
   MonotonicStopWatch single_process_page_index_timer;
   single_process_page_index_timer.Start();
   ResetPageFiltering();
-  RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_));
+  RETURN_IF_ERROR(page_index_.ReadAll(group_idx_));
   if (page_index_.IsEmpty()) return Status::OK();
   // We can release the raw page index buffer when we exit this function.
   const auto scope_exit = MakeScopeExitTrigger([this](){page_index_.Release();});
@@ -1435,7 +1391,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
   }
 
   min_max_tuple_->Init(min_max_tuple_desc->byte_size());
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
 
   int filtered_pages = 0;
 
@@ -1538,7 +1494,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
 }
 
 Status HdfsParquetScanner::EvaluatePageIndex() {
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
   vector<RowRange> skip_ranges;
 
   for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
@@ -1628,7 +1584,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
 Status HdfsParquetScanner::ComputeCandidatePagesForColumns() {
   if (candidate_ranges_.empty()) return Status::OK();
 
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
   for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
     const auto& page_locations = scalar_reader->offset_index_.page_locations;
     if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows,
@@ -2157,7 +2113,8 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup&
       if (!bloom_filter.Find(hash)) {
         *skip_row_group = true;
         VLOG(3) << Substitute("Row group with idx $0 filtered by Parquet Bloom filter on "
-            "column with idx $1 in file $2.", row_group_idx_, col_idx, filename());
+                              "column with idx $1 in file $2.",
+            group_idx_, col_idx, filename());
         return Status::OK();
       }
     }
@@ -2231,7 +2188,7 @@ Status HdfsParquetScanner::AssembleRowsWithoutLateMaterialization(
     RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
     if (row_batch->AtCapacity()) break;
   }
-  row_group_rows_read_ += num_rows_read;
+  rows_read_in_group_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
@@ -2357,7 +2314,7 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou
       break;
     }
   }
-  row_group_rows_read_ += num_rows_read;
+  rows_read_in_group_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
@@ -2749,7 +2706,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
   DCHECK(column_readers != nullptr);
   DCHECK(column_readers->empty());
 
-  if (scan_node_->optimize_parquet_count_star()) {
+  if (scan_node_->optimize_count_star()) {
     // Column readers are not needed because we are not reading from any columns if this
     // optimization is enabled.
     return Status::OK();
@@ -2899,7 +2856,7 @@ Status HdfsParquetScanner::InitScalarColumns() {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
   DCHECK(file_desc != nullptr);
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
 
   // Used to validate that the number of values in each reader in column_readers_ at the
   // same SchemaElement is the same.
@@ -2919,7 +2876,7 @@ Status HdfsParquetScanner::InitScalarColumns() {
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
-    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_));
   }
 
   ColumnRangeLengths col_range_lengths(scalar_readers_.size());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 122a976a0..df58c4985 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -48,7 +48,6 @@ class ColumnStatsReader;
 class BaseScalarColumnReader;
 template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader;
-class BoolColumnReader;
 class ParquetPageReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
@@ -346,9 +345,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;
 
-  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
-  virtual Status ProcessSplit() WARN_UNUSED_RESULT;
-  virtual void Close(RowBatch* row_batch);
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+  virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
+  virtual void Close(RowBatch* row_batch) override;
 
   /// Helper function to create ColumnStatsReader object. 'col_order' might be NULL.
   ColumnStatsReader CreateColumnStatsReader(
@@ -389,30 +388,22 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
       "You can increase PARQUET_FOOTER_SIZE if you want, "
       "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
 
+ protected:
+  virtual int64_t GetNumberOfRowsInFile() const override {
+    return file_metadata_.num_rows;
+  }
+
  private:
   friend class ParquetColumnReader;
   friend class CollectionColumnReader;
   friend class BaseScalarColumnReader;
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
   friend class ScalarColumnReader;
-  friend class BoolColumnReader;
   friend class HdfsParquetScannerTest;
   friend class ParquetPageIndex;
   friend class ParquetColumnChunkReader;
   friend class ParquetPageReader;
 
-  /// Index of the current row group being processed. Initialized to -1 which indicates
-  /// that we have not started processing the first row group yet (GetNext() has not yet
-  /// been called).
-  int32_t row_group_idx_;
-
-  /// Counts the number of rows processed for the current row group.
-  int64_t row_group_rows_read_;
-
-  /// Indicates whether we should advance to the next row group in the next GetNext().
-  /// Starts out as true to move to the very first row group.
-  bool advance_row_group_;
-
   boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
 
   /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
@@ -495,9 +486,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// perm_pool_.
   std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
 
-  /// Timer for materializing rows.  This ignores time getting the next buffer.
-  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
-
   /// Average and min/max time spent processing the page index for each row group.
   RuntimeProfile::SummaryStatsCounter* process_page_index_stats_;
 
@@ -573,7 +561,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// that spans entire batch of length 'scratch_batch_->capacity'.
   ScratchMicroBatch complete_micro_batch_;
 
-  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
+  virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
 
   /// Return true if we can evaluate this type of predicate on parquet statistic.
   /// FE could populate stats-predicates that can't be evaluated here if the table
@@ -833,7 +821,7 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
       int row_group_idx, int64_t rows_read) WARN_UNUSED_RESULT;
 
   /// Part of the HdfsScanner interface, not used in Parquet.
-  Status InitNewRange() WARN_UNUSED_RESULT { return Status::OK(); }
+  Status InitNewRange() override WARN_UNUSED_RESULT { return Status::OK(); }
 
   /// Transfers the remaining resources backing tuples such as IO buffers and memory
   /// from mem pools to the given row batch. Closes all column readers.
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 51c0b50af..cacda51ed 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -548,7 +548,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   int64_t LastRowIdxInCurrentPage() const {
     DCHECK(!candidate_data_pages_.empty());
     int64_t num_rows =
-        parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows;
+        parent_->file_metadata_.row_groups[parent_->group_idx_].num_rows;
     // Find the next valid page.
     int page_idx = candidate_data_pages_[candidate_page_idx_] + 1;
     while (page_idx < offset_index_.page_locations.size()) {
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index b608c692c..19074de6e 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -313,9 +313,8 @@ struct THdfsScanNode {
   // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 
-  // The byte offset of the slot for Parquet metadata if Parquet count star optimization
-  // is enabled.
-  10: optional i32 parquet_count_star_slot_offset
+  // The byte offset of the slot for counter if count star optimization is enabled.
+  10: optional i32 count_star_slot_offset
 
   // If true, the backend only needs to return one row per partition.
   11: optional bool is_partition_key_scan
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index a5dba3831..78577769d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -89,6 +89,7 @@ import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.MathUtil;
@@ -330,6 +331,8 @@ public class HdfsScanNode extends ScanNode {
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
 
+  private boolean isFullAcidTable_ = false;
+
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -349,6 +352,8 @@ public class HdfsScanNode extends ScanNode {
     randomReplica_ = hdfsTblRef.getRandomReplica();
     FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable();
     Preconditions.checkState(tbl_ == hdfsTable);
+    isFullAcidTable_ =
+        AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters());
     StringBuilder error = new StringBuilder();
     aggInfo_ = aggInfo;
     skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
@@ -392,13 +397,14 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Returns true if the Parquet count(*) optimization can be applied to the query block
+   * Returns true if the count(*) optimization can be applied to the query block
    * of this scan node.
    */
   protected boolean canApplyCountStarOptimization(Analyzer analyzer,
       Set<HdfsFileFormat> fileFormats) {
     if (fileFormats.size() != 1) return false;
-    if (!hasParquet(fileFormats)) return false;
+    if (isFullAcidTable_) return false;
+    if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false;
     return canApplyCountStarOptimization(analyzer);
   }
 
@@ -1482,6 +1488,13 @@ public class HdfsScanNode extends ScanNode {
           numRangesAdjusted :
           Math.min(inputCardinality_, numRangesAdjusted);
     }
+
+    if (countStarSlot_ != null) {
+      // We are doing optimized count star. Override cardinality with total num files.
+      long totalFiles = sumValues(totalFilesPerFs_);
+      inputCardinality_ = totalFiles;
+      cardinality_ = totalFiles;
+    }
     if (LOG.isTraceEnabled()) {
       LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
     }
@@ -1775,8 +1788,7 @@ public class HdfsScanNode extends ScanNode {
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
     Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
     if (countStarSlot_ != null) {
-      msg.hdfs_scan_node.setParquet_count_star_slot_offset(
-          countStarSlot_.getByteOffset());
+      msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
     }
     if (!statsConjuncts_.isEmpty()) {
       for (Expr e: statsConjuncts_) {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index fb395ea61..acc3ab7a4 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1202,4 +1202,8 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("tpcds-dist-method", "tpcds");
   }
 
+  @Test
+  public void testOrcStatsAgg() {
+    runPlannerTestFile("orc-stats-agg");
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
new file mode 100644
index 000000000..00d4caca8
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
@@ -0,0 +1,426 @@
+# Verify that that the ORC count(*) optimization is applied in all count(*) or
+# count(<literal>) cases when scanning a ORC table. In the last case, we are scanning
+# a text table, so the optimization is not applied. The optimization is observed when
+# the cardinality of the ORC scan (24) is the same as # the # of files (24).
+select count(*) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(1) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(123) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(*) from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--08:AGGREGATE [FINALIZE]
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+02:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--16:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  15:EXCHANGE [UNPARTITIONED]
+|  |
+|  08:AGGREGATE
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--14:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  13:EXCHANGE [UNPARTITIONED]
+|  |
+|  06:AGGREGATE
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+|--12:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  11:EXCHANGE [UNPARTITIONED]
+|  |
+|  04:AGGREGATE
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Verify that the ORC count(*) optimization is applied even if there is more than
+# one item in the select list.
+select count(*), count(1), count(123) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Select count(<partition col>) - the optimization is disabled because it's not a
+# count(<literal>) or count(*) aggregate function.
+select count(year) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(`year`)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=13.07K
+====
+# Group by partition columns.
+select month, count(*) from functional_orc_def.uncomp_src_alltypes group by month, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  group by: `month`, `year`
+|  row-size=16B cardinality=24
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=16B cardinality=24
+====
+# The optimization is disabled because tinyint_col is not a partition col.
+select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group by tinyint_col, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tinyint_col, `year`
+|  row-size=13B cardinality=13.07K
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=5B cardinality=13.07K
+====
+# The optimization is disabled because it can not be applied to the 1st aggregate
+# function.
+select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(`year`), count(*)
+|  row-size=16B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=13.07K
+====
+# Optimization is not applied because the inner count(*) is not materialized. The outer
+# count(*) does not reference a base table.
+select count(*) from (select count(*) from functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  row-size=0B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   partition key scan
+   row-size=0B cardinality=24
+====
+# The optimization is applied if count(*) is in the having clause.
+select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  having: count(*) > 1
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# The count(*) optimization is applied in the inline view.
+select count(*), count(a) from (select count(1) as a from functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*), count(count(*))
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# The count(*) optimization is applied to the inline view even if there is a join.
+select *
+from functional.alltypes x inner join (
+  select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year
+) t on x.id = t.a;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: x.id = count(*)
+|  runtime filters: RF000 <- count(*)
+|  row-size=101B cardinality=2
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  |  group by: `year`
+|  |  row-size=12B cardinality=2
+|  |
+|  01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=12B cardinality=24
+|
+00:SCAN HDFS [functional.alltypes x]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> x.id
+   row-size=89B cardinality=7.30K
+====
+# The count(*) optimization is not applied if there is more than 1 table ref.
+select count(*) from functional_orc_def.uncomp_src_alltypes a, functional_orc_def.uncomp_src_alltypes b
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=0B cardinality=170.85M
+|
+|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=0B cardinality=13.07K
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# The count(*) optimization is applied if all predicates are on partition columns only.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` < 2010, `month` > 8
+   HDFS partitions=4/24 files=4 size=33.53KB
+   row-size=8B cardinality=4
+====
+# tinyint_col is not a partition column so the optimization is disabled.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and tinyint_col > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` < 2010
+   HDFS partitions=12/24 files=12 size=102.74KB
+   predicates: tinyint_col > 8
+   row-size=1B cardinality=654
+====
+# Optimization is applied after constant folding.
+select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Optimization is not applied to count(null).
+select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(null) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=2
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: count(NULL)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=0B cardinality=13.07K
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(NULL + 3)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# Optimization is not applied when selecting from an empty table.
+select count(*) from functional_orc_def.emptytable
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.emptytable]
+   partitions=0/0 files=0 size=0B
+   row-size=0B cardinality=0
+====
+# Optimization is not applied when all partitions are pruned.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` = -1
+   partitions=0/24 files=0 size=0B
+   row-size=0B cardinality=0
+====
+# Optimization is not applied across query blocks, even though it would be correct here.
+select count(*) from (select int_col from functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# In general, optimization is not applied when there is a distinct agg.
+select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(1), count:merge(*)
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE
+|  output: count(*)
+|  group by: 1
+|  row-size=9B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# The optimization is applied here because only the count(*) and a partition column are
+# materialized. Non-materialized agg exprs are ignored.
+select year, cnt from (
+  select year, count(bigint_col), count(*) cnt, avg(int_col)
+  from functional_orc_def.uncomp_src_alltypes
+  where month=1
+  group by year
+) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
+|  group by: `year`
+|  row-size=12B cardinality=2
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `month` = 1
+   HDFS partitions=2/24 files=2 size=17.07KB
+   row-size=12B cardinality=2
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
index 41a10602f..7ec11f482 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
@@ -1,6 +1,7 @@
 # Verify that that the parquet count(*) optimization is applied in all count(*) or
 # count(<literal>) cases when scanning a Parquet table. In the last case, we are scanning
-# a text table, so the optimization is not applied.
+# a text table, so the optimization is not applied. The optimization is observed when
+# the cardinality of the Parquet scan (24) is the same as # the # of files (24).
 select count(*) from functional_parquet.alltypes
 union all
 select count(1) from functional_parquet.alltypes
@@ -29,7 +30,7 @@ PLAN-ROOT SINK
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=12.75K
+|     row-size=8B cardinality=24
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -37,7 +38,7 @@ PLAN-ROOT SINK
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=12.75K
+|     row-size=8B cardinality=24
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -45,7 +46,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -79,7 +80,7 @@ PLAN-ROOT SINK
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=12.75K
+|     row-size=8B cardinality=24
 |
 |--12:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
@@ -93,7 +94,7 @@ PLAN-ROOT SINK
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=12.75K
+|     row-size=8B cardinality=24
 |
 10:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -107,7 +108,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ====
 # Verify that the parquet count(*) optimization is applied even if there is more than
 # one item in the select list.
@@ -121,7 +122,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ====
 # Select count(<partition col>) - the optimization should be disabled because it's not a
 # count(<literal>) or count(*) aggregate function.
@@ -149,7 +150,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=16B cardinality=12.75K
+   row-size=16B cardinality=24
 ====
 # The optimization is disabled because tinyint_col is not a partition col.
 select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year
@@ -208,7 +209,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ====
 # The count(*) optimization is applied in the inline view.
 select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t
@@ -225,7 +226,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ====
 # The count(*) optimization is applied to the inline view even if there is a join.
 select *
@@ -247,7 +248,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=12B cardinality=12.75K
+|     row-size=12B cardinality=24
 |
 00:SCAN HDFS [functional.alltypes x]
    HDFS partitions=24/24 files=24 size=478.45KB
@@ -286,7 +287,7 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional_parquet.alltypes]
    partition predicates: `year` < 2010, `month` > 8
    HDFS partitions=4/24 files=4 size=33.53KB
-   row-size=8B cardinality=2.13K
+   row-size=8B cardinality=4
 ====
 # tinyint_col is not a partition column so the optimization is disabled.
 select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8;
@@ -314,7 +315,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=12.75K
+   row-size=8B cardinality=24
 ====
 # Optimization is not applied to count(null).
 select count(1 + null + 3) from functional_parquet.alltypes
@@ -420,5 +421,5 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional_parquet.alltypes]
    partition predicates: `month` = 1
    HDFS partitions=2/24 files=2 size=17.07KB
-   row-size=12B cardinality=1.09K
+   row-size=12B cardinality=2
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 64373845a..5fc965b60 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1869,6 +1869,7 @@ select count(*) from tpch_parquet.lineitem
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
 Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1890,11 +1891,12 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=4.12MB Threads=3
 Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1929,11 +1931,12 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.12MB Threads=2
 Per-Host Resource Estimates: Memory=84MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1968,7 +1971,7 @@ Per-Instance Resources: mem-estimate=80.02MB mem-reservation=128.00KB thread-res
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sort
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 36c329ec6..8ea56653c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -88,7 +88,8 @@ select count(*) from ice_bigints;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 6
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 6
 ====
 ---- QUERY
 select count(*) from ice_bigints
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
index d20fef803..430dddc0a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -19,7 +19,8 @@ select count(*) from ice_types1;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
 ====
 ---- QUERY
 show files in ice_types1;
@@ -250,7 +251,8 @@ select count(*) from ice_types2;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
 ====
 ---- QUERY
 show files in ice_types2;
@@ -367,7 +369,8 @@ select count(*) from ice_types3;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
 ====
 ---- QUERY
 show files in ice_types3;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
index 2d5bf9e80..f4b7730a4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
@@ -25,16 +25,17 @@ bigint, bigint
 280,1260
 ====
 ---- QUERY
-# IMPALA-5861: RowsRead counter should be accurate for table scan that returns
-# zero slots. This test is run with various batch_size values, which helps
-# reproduce the bug. Scanning multiple file formats triggers the bug because
-# the Parquet count(*) rewrite is disabled when non-Parquet file formats are
-# present.
+# IMPALA-11123: IMPALA-5861 add this test to verify that 'RowRead' counter is not double
+# counted for zero slot scan. IMPALA-11123 remove incerement of 'RowRead' counter
+# in case of optimized count(star) and zero slot scan query. This cause reduction of
+# 'RowsRead' value from 1200 to 900 since the other 300 are served through
+# zero slot scan. We do not verify 'NumFileMetadataRead' since it does not stay the same
+# over different test vector permutation.
 select count(*) from functional.alltypesmixedformat
 ---- TYPES
 bigint
 ---- RESULTS
 1200
 ---- RUNTIME_PROFILE
-aggregation(SUM, RowsRead): 1200
+aggregation(SUM, RowsRead): 900
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
new file mode 100644
index 000000000..4c9bdfb50
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
@@ -0,0 +1,152 @@
+====
+---- QUERY
+# Tests the correctness of the ORC count(*) optimization.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes
+---- RESULTS
+7300
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# Tests the correctness of zero slot scan over ORC.
+# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield
+# different number.
+select 1 from functional_orc_def.alltypestiny
+---- RESULTS
+1
+1
+1
+1
+1
+1
+1
+1
+---- TYPES
+tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# ORC count(*) optimization with predicates on the partition columns.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
+---- RESULTS
+1220
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# ORC count(*) optimization with group by partition columns.
+select year, month, count(1)
+from functional_orc_def.uncomp_src_alltypes group by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,280
+2010,3,310
+2010,4,300
+2010,5,310
+2010,6,300
+2010,7,310
+2010,8,310
+2010,9,300
+2010,10,310
+2010,11,300
+2010,12,310
+---- TYPES
+int, int, bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# ORC count(*) optimization with both group by and predicates on partition columns.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
+group by month
+---- RESULTS
+310
+300
+310
+300
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# ORC count(*) optimization with the result going into a join.
+select x.bigint_col from functional_orc_def.uncomp_src_alltypes x
+  inner join (
+    select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year
+  ) t on x.id = t.a;
+---- RESULTS
+0
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 24
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 7300
+=====
+---- QUERY
+# ORC count(*) optimization with the agg function in the having clause.
+select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
+---- RESULTS
+1
+---- TYPES
+tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Verify that 0 is returned for count(*) on an empty table.
+select count(1) from functional_orc_def.emptytable
+---- RESULTS
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# Verify that 0 is returned when all partitions are pruned.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
+---- RESULTS
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
+=====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
index 620c50bef..1473a1b8f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
@@ -7,6 +7,30 @@ from functional_parquet.alltypes
 7300
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# Tests the correctness of zero slot scan over Parquet.
+# Not checking 'NumFileMetadataRead' here since codegen vs non-codegen yield
+# different number.
+select 1 from functional_orc_def.alltypestiny
+---- RESULTS
+1
+1
+1
+1
+1
+1
+1
+1
+---- TYPES
+tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with predicates on the partition columns.
@@ -16,6 +40,10 @@ from functional_parquet.alltypes where year < 2010 and month > 8
 1220
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with group by partition columns.
@@ -48,6 +76,10 @@ from functional_parquet.alltypes group by year, month
 2010,12,310
 ---- TYPES
 int, int, bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with both group by and predicates on partition columns.
@@ -61,6 +93,10 @@ group by month
 300
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with the result going into a join.
@@ -73,6 +109,10 @@ select x.bigint_col from functional.alltypes x
 0
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 7300
 =====
 ---- QUERY
 # Parquet count(*) optimization with the agg function in the having clause.
@@ -81,6 +121,10 @@ select 1 from functional_parquet.alltypes having count(*) > 1
 1
 ---- TYPES
 tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
 # Verify that 0 is returned for count(*) on an empty table.
@@ -89,6 +133,10 @@ select count(1) from functional_parquet.emptytable
 0
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Verify that 0 is returned when all partitions are pruned.
@@ -97,6 +145,10 @@ select count(1) from functional_parquet.alltypes where year = -1
 0
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Test different row group size combinations.
@@ -114,6 +166,10 @@ select count(*) from tpch_parquet.lineitem
 6001215
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 6
+aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # IMPALA-5679: Count(*) with group by on a string partition column.
@@ -136,4 +192,8 @@ select string_col, count(*) from $DATABASE.string_partitioned_table group by str
 '9',730
 ---- TYPES
 string, bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 10
+aggregation(SUM, RowsRead): 0
 =====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
index 0d9e17330..3938ff3ec 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
@@ -11,6 +11,10 @@ INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Test with more complex multiple distinct aggregation.
@@ -23,6 +27,10 @@ BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Distinct aggregation with multiple columns.
@@ -58,6 +66,10 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with analytic function.
@@ -71,6 +83,10 @@ INT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition scan combined with sort.
@@ -107,6 +123,10 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with predicate on partition columns
@@ -121,6 +141,10 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 2
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 2
 ====
 ---- QUERY
 # Partition key scan combined with having predicate.
@@ -136,6 +160,10 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+# Confirm that only one metadata per file is read.
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Empty table should not return any rows
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 13057c0d2..698e66da7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -15,6 +15,10 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
+---- RUNTIME_PROFILE: table_format=parquet,orc
+row_regex: .*RowsReturned: 2.43K .*
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -49,6 +53,10 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
+---- RUNTIME_PROFILE: table_format=parquet,orc
+row_regex: .*RowsReturned: 2.43K .*
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Shuffle join, global mode. Expect filters to be propagated.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
index b6fab637d..1a6c30cdd 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
@@ -15,6 +15,10 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
+---- RUNTIME_PROFILE: table_format=parquet,orc
+row_regex: .*RowsReturned: 2.43K .*
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -49,6 +53,10 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
+---- RUNTIME_PROFILE: table_format=parquet,orc
+row_regex: .*RowsReturned: 2.43K .*
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Shuffle join, global mode. Expect filters to be propagated.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index 72d650537..002d5d0d9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -238,4 +238,7 @@ tinyint
 1
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 100
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, RowsReturned): 200
 ====
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 918ba826f..995d1ed15 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -686,7 +686,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     different number of executors and memory limit in each."""
     # A small query with estimated memory per host of 10MB that can run on the small
     # executor group
-    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;"
+    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where d_year=2022;"
     # A large query with estimated memory per host of 132MB that can only run on
     # the large executor group.
     LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 2b3d9aa17..2d7efe914 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -76,6 +76,11 @@ class TestQueryRetries(CustomClusterTestSuite):
         union all
         select count(*) from functional.alltypes where bool_col = sleep(50)"""
 
+  # A simple count query with predicate. The predicate is needed so that the planner does
+  # not create the optimized count(star) query plan.
+  _count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
+  _count_query_result = "55"
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -246,7 +251,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     killed_impalad = self.__kill_random_impalad()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -258,7 +263,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert "6001215" in results.data[0]
+    assert self._count_query_result in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -306,7 +311,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # and the query should be retried. Add delay before admission so that the 2nd node
     # is removed from the blacklist before scheduler makes schedule for the retried
     # query.
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -319,7 +324,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert "6001215" in results.data[0]
+    assert self._count_query_result in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -370,7 +375,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     rpc_not_accessible_impalad = self.cluster.impalads[1]
     assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
 
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -692,7 +697,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -730,7 +735,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.__wait_until_retry_state(handle, 'RETRYING')
@@ -759,7 +764,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
 
@@ -782,7 +787,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
     self.hs2_client.execute_async(query)
@@ -808,7 +813,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
     # The number of in-flight queries is 0 at the beginning, then 1 when the original
     # query is submitted. It's 2 when the retried query is registered. Although the retry
@@ -837,7 +842,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
     self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -876,7 +881,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     client = self.cluster.get_first_impalad().service.create_beeswax_client()
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
@@ -906,7 +911,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Test query retries with the HS2 protocol. Enable the results set cache as well and
     test that query retries work with the results cache."""
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
     handle = self.hs2_client.execute_async(query)
@@ -915,7 +920,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.hs2_client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert int(results.data[0]) == 6001215
+    assert results.data[0] == self._count_query_result
 
     # Validate the live exec summary.
     retried_query_id = \
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 14f57dc33..5e450485e 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -257,24 +257,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       # Verify codegen was enabled for all four stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
 
-  def test_parquet_count_star_optimization(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'text' or
-        vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-
-  def test_kudu_count_star_optimization(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'text' or
-       vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
-    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
-
   def test_ndv(self):
     """Test the version of NDV() that accepts a scale value argument against
     different column data types. The scale argument is an integer in range
@@ -318,17 +300,55 @@ class TestAggregationQueries(ImpalaTestSuite):
       for j in xrange(0, 11):
         assert(ndv_results[i - 1][j] == int(ndv_vals[j]))
 
-  def test_sampled_ndv(self, vector, unique_database):
+  def test_grouping_sets(self, vector):
+    """Tests for ROLLUP, CUBE and GROUPING SETS."""
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
+    self.run_test_case('QueryTest/grouping-sets', vector)
+
+
+class TestAggregationQueriesRunOnce(ImpalaTestSuite):
+  """Run the aggregation test suite similarly as TestAggregationQueries, but with stricter
+  constraint. Each test in this class only run once by setting uncompressed text dimension
+  for all exploration strategy. However, they may not necessarily target uncompressed text
+  table format. This also run with codegen enabled and disabled to exercise our
+  non-codegen code"""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAggregationQueriesRunOnce, cls).add_test_dimensions()
+
+    cls.ImpalaTestMatrix.add_dimension(
+      create_exec_option_dimension(disable_codegen_options=[False, True]))
+
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_parquet_count_star_optimization(self, vector, unique_database):
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+
+  def test_kudu_count_star_optimization(self, vector):
+    self.run_test_case('QueryTest/kudu-stats-agg', vector)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/kudu-stats-agg', vector)
+
+  def test_orc_count_star_optimization(self, vector):
+    self.run_test_case('QueryTest/orc-stats-agg', vector)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/orc-stats-agg', vector)
+
+  def test_sampled_ndv(self, vector):
     """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
     reasonably made deterministic with existing options so we test it separately.
     The goal of this test is to ensure that SAMPLED_NDV() works on all data types
     and returns approximately sensible estimates. It is not the goal of this test
     to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected
     be inaccurate on small data sets like the ones we use in this test."""
-    if (vector.get_value('table_format').file_format != 'text' or
-        vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
 
     # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV()
     # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do.
@@ -382,12 +402,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       for i in xrange(14, 16):
         self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
 
-  def test_grouping_sets(self, vector):
-    """Tests for ROLLUP, CUBE and GROUPING SETS."""
-    if vector.get_value('table_format').file_format == 'hbase':
-      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
-    self.run_test_case('QueryTest/grouping-sets', vector)
-
 
 class TestDistinctAggregation(ImpalaTestSuite):
   """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 798f4d690..9a8553349 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1515,7 +1515,8 @@ class TestOrc(ImpalaTestSuite):
   def _misaligned_orc_stripes_helper(
           self, table_name, rows_in_table, num_scanners_with_no_reads=0):
     """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners
-    that don't read anything because the underlying file is poorly formatted
+    that don't read anything because the underlying file is poorly formatted.
+    Additionally, test that select count(star) match with expected number of rows.
     """
     query = 'select * from %s' % table_name
     result = self.client.execute(query)
@@ -1536,6 +1537,11 @@ class TestOrc(ImpalaTestSuite):
       total += int(n)
     assert total == num_scanners_with_no_reads
 
+    # Test that select count(star) match with expected number of rows.
+    query = 'select count(*) from %s' % table_name
+    result = self.client.execute(query)
+    assert int(result.data[0]) == rows_in_table
+
   # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive
   # queries that hang in some cases (IMPALA-9345). It would be possible to separate
   # the tests that use Hive and run most tests on S3, but I think that running these on
@@ -1681,13 +1687,13 @@ class TestOrc(ImpalaTestSuite):
         "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
         unique_database, test_name, test_files)
     err = self.execute_query_expect_failure(self.client,
-        "select count(*) from {0}.{1}".format(unique_database, test_name))
+        "select count(id) from {0}.{1}".format(unique_database, test_name))
     assert expected_error in str(err)
 
   def test_invalid_schema(self, vector, unique_database):
     """Test scanning of ORC file with malformed schema."""
     self._run_invalid_schema_test(unique_database, "corrupt_schema",
-        "Encountered parse error during schema selection")
+        "Encountered parse error in tail of ORC file")
     self._run_invalid_schema_test(unique_database, "corrupt_root_type",
         "Root of the selected type returned by the ORC lib is not STRUCT: boolean.")
 
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 135a024f9..dca6deaaa 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -254,15 +254,19 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True):
       # evaluated for all formats that don't have a commented section for this query.
       if subsection_name == 'RUNTIME_PROFILE':
         if subsection_comment is not None and subsection_comment is not "":
-          allowed_formats = ['kudu']
+          allowed_formats = ['kudu', 'parquet', 'orc']
           if not subsection_comment.startswith("table_format="):
-            raise RuntimeError, 'RUNTIME_PROFILE comment (%s) must be of the form ' \
-              '"table_format=FORMAT"' % subsection_comment
-          table_format = subsection_comment[13:]
-          if table_format not in allowed_formats:
-            raise RuntimeError, 'RUNTIME_PROFILE table format (%s) must be in: %s' % \
-                (table_format, allowed_formats)
-          subsection_name = 'RUNTIME_PROFILE_%s' % table_format
+            raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form '
+              '"table_format=FORMAT[,FORMAT2,...]"' % subsection_comment)
+          parsed_formats = subsection_comment[13:].split(',')
+          for table_format in parsed_formats:
+            if table_format not in allowed_formats:
+              raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' %
+                  (table_format, allowed_formats))
+            else:
+              subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format
+              parsed_sections[subsection_name_for_format] = subsection_str
+          continue
 
       parsed_sections[subsection_name] = subsection_str