You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2022/11/10 00:36:54 UTC

[impala] branch master updated (4bc86ac63 -> efa426453)

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 4bc86ac63 IMPALA-11714: Fix resolve_minidumps.py on Ubuntu 16
     new 4e87a80fa IMPALA-9488: Add metrics for EC reads
     new efa426453 IMPALA-11692: Struct slot memory sharing involving select * not working properly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/base-sequence-scanner.cc               |   7 +-
 be/src/exec/hdfs-scan-node-base.cc                 |  42 ++--
 be/src/exec/hdfs-scan-node-base.h                  |  22 +-
 be/src/exec/hdfs-scanner.cc                        |  13 +-
 be/src/exec/orc/hdfs-orc-scanner.cc                |  13 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   4 +-
 be/src/exec/parquet/parquet-page-index.cc          |  10 +-
 be/src/exec/parquet/parquet-page-reader.cc         |   9 +-
 be/src/exec/scanner-context.cc                     |   5 +-
 be/src/exec/text/hdfs-text-scanner.cc              |   7 +-
 be/src/runtime/io/disk-io-mgr-stress.cc            |   4 +-
 be/src/runtime/io/disk-io-mgr-test.cc              |  36 +--
 be/src/runtime/io/hdfs-file-reader.cc              |   3 +
 be/src/runtime/io/request-context.h                |   4 +
 be/src/runtime/io/request-ranges.h                 |  34 ++-
 be/src/runtime/io/scan-range.cc                    |  38 +--
 be/src/runtime/tmp-file-mgr.cc                     |  10 +-
 be/src/scheduling/scheduler.cc                     |   2 +
 be/src/util/impalad-metrics.cc                     |   5 +
 be/src/util/impalad-metrics.h                      |   4 +
 common/thrift/PlanNodes.thrift                     |   3 +
 common/thrift/metrics.json                         |  10 +
 .../org/apache/impala/analysis/SelectStmt.java     | 269 ++++++++++++++-------
 .../org/apache/impala/planner/HdfsScanNode.java    |   1 +
 .../org/apache/impala/planner/PlannerTest.java     |  34 +++
 tests/query_test/test_io_metrics.py                |  79 ++++++
 26 files changed, 476 insertions(+), 192 deletions(-)
 create mode 100644 tests/query_test/test_io_metrics.py


[impala] 01/02: IMPALA-9488: Add metrics for EC reads

Posted by db...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4e87a80fae86c5775a5f35607806fee287b3797a
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Fri Oct 28 10:57:48 2022 -0700

    IMPALA-9488: Add metrics for EC reads
    
    Adds metric tracking erasure-coded bytes read. Adds ScanRange::TestInfo
    to pass file info through calls of AllocateScanRange so it's easier to
    add erasure coding as another piece of file info.
    
    Adds a test to verify that the expected number of bytes are read for
    existing read metrics and the new `erasure-coded-bytes-read` metric when
    doing a select.
    
    Change-Id: Ieb06bac9dea4b632621653d2935e9a7b2dc81341
    Reviewed-on: http://gerrit.cloudera.org:8080/19178
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc               |  7 +-
 be/src/exec/hdfs-scan-node-base.cc                 | 42 +++++++-----
 be/src/exec/hdfs-scan-node-base.h                  | 22 +++---
 be/src/exec/hdfs-scanner.cc                        | 13 ++--
 be/src/exec/orc/hdfs-orc-scanner.cc                | 13 ++--
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  4 +-
 be/src/exec/parquet/parquet-page-index.cc          | 10 ++-
 be/src/exec/parquet/parquet-page-reader.cc         |  9 +--
 be/src/exec/scanner-context.cc                     |  5 +-
 be/src/exec/text/hdfs-text-scanner.cc              |  7 +-
 be/src/runtime/io/disk-io-mgr-stress.cc            |  4 +-
 be/src/runtime/io/disk-io-mgr-test.cc              | 36 +++++-----
 be/src/runtime/io/hdfs-file-reader.cc              |  3 +
 be/src/runtime/io/request-context.h                |  4 ++
 be/src/runtime/io/request-ranges.h                 | 34 +++++++---
 be/src/runtime/io/scan-range.cc                    | 38 +++++------
 be/src/runtime/tmp-file-mgr.cc                     | 10 +--
 be/src/scheduling/scheduler.cc                     |  2 +
 be/src/util/impalad-metrics.cc                     |  5 ++
 be/src/util/impalad-metrics.h                      |  4 ++
 common/thrift/PlanNodes.thrift                     |  3 +
 common/thrift/metrics.json                         | 10 +++
 .../org/apache/impala/planner/HdfsScanNode.java    |  1 +
 tests/query_test/test_io_metrics.py                | 79 ++++++++++++++++++++++
 24 files changed, 254 insertions(+), 111 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index ae66d0e01..83e1221b1 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -62,10 +62,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     bool expected_local = false;
     int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE :
         BufferOpts::NO_CACHING;
-    ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
-        files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
-        expected_local, files[i]->mtime, BufferOpts(cache_options),
-        metadata->original_split);
+    ScanRange* header_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+        header_size, 0, metadata->partition_id, -1, expected_local,
+        BufferOpts(cache_options), metadata->original_split);
     ScanRangeMetadata* header_metadata =
             static_cast<ScanRangeMetadata*>(header_range->meta_data());
     header_metadata->is_sequence_header = true;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b5bab2c13..f6f1f008d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -111,6 +111,8 @@ PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES,
     "The total number of bytes read via short circuit read");
 PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES,
     "The total number of bytes read from data node cache");
+PROFILE_DEFINE_COUNTER(BytesReadErasureCoded, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes read from erasure-coded data");
 PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT,
     "The total number of remote scan ranges");
 PROFILE_DEFINE_COUNTER(BytesReadRemoteUnexpected, STABLE_LOW, TUnit::BYTES,
@@ -298,6 +300,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
         file_desc->file_length = split.file_length();
         file_desc->mtime = split.mtime();
         file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
+        file_desc->is_erasure_coded = split.is_erasure_coded();
         file_desc->file_format = partition_desc->file_format();
         file_desc->file_metadata = file_metadata;
         RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
@@ -328,10 +331,9 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
       }
       ScanRangeMetadata* metadata =
           obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
-      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
-          file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
-          params.volume_id(), expected_local, file_desc->mtime,
-          BufferOpts(cache_options)));
+      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool,
+          file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata,
+          params.volume_id(), expected_local, BufferOpts(cache_options)));
       total_splits++;
     }
     // Update server wide metrics for number of scan ranges and ranges that have
@@ -608,6 +610,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   bytes_read_short_circuit_ =
       PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile());
   bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile());
+  bytes_read_ec_ = PROFILE_BytesReadErasureCoded.Instantiate(runtime_profile());
   num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile());
   unexpected_remote_bytes_ =
       PROFILE_BytesReadRemoteUnexpected.Instantiate(runtime_profile());
@@ -822,37 +825,37 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
   return curr_reservation;
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-    int64_t mtime,  const BufferOpts& buffer_opts, const ScanRange* original_split) {
+    const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
-  return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
-      mtime, buffer_opts);
+  return AllocateScanRange(fi, len, offset, {}, metadata, disk_id, expected_local,
+      buffer_opts);
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
     int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
-    int64_t partition_id, int disk_id, bool expected_local, int64_t mtime,
+    int64_t partition_id, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
-  return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
-      disk_id, expected_local, mtime, buffer_opts);
+  return AllocateScanRange(fi, len, offset, move(sub_ranges), metadata,
+      disk_id, expected_local, buffer_opts);
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
-    int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
-    int disk_id, bool expected_local, int64_t mtime,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
+    int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
+    ScanRangeMetadata* metadata, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts) {
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
   // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
   // beyond the end of the file).
-  DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
+  DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, fi.filename)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
-  return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
-      move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts);
+  return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fi, len, offset,
+      move(sub_ranges), metadata, disk_id, expected_local, buffer_opts);
 }
 
 const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
@@ -1213,6 +1216,7 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
     bytes_read_local_->Set(reader_context_->bytes_read_local());
     bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
     bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+    bytes_read_ec_->Set(reader_context_->bytes_read_ec());
     num_remote_ranges_->Set(reader_context_->num_remote_ranges());
     unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
     cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
@@ -1237,6 +1241,8 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
         bytes_read_short_circuit_->value());
     ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
         bytes_read_dn_cache_->value());
+    ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ->Increment(
+        bytes_read_ec_->value());
   }
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 21d18f25c..053e61c70 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -71,6 +71,10 @@ struct HdfsFileDesc {
       file_compression(THdfsCompression::NONE),
       file_format(THdfsFileFormat::TEXT) {}
 
+  io::ScanRange::FileInfo GetFileInfo() const {
+    return io::ScanRange::FileInfo{filename.c_str(), fs, mtime, is_erasure_coded};
+  }
+
   /// Connection to the filesystem containing the file.
   hdfsFS fs;
 
@@ -93,6 +97,9 @@ struct HdfsFileDesc {
   /// Extra file metadata, e.g. Iceberg-related file-level info.
   const ::org::apache::impala::fb::FbFileMetadata* file_metadata;
 
+  /// Whether file is erasure coded.
+  bool is_erasure_coded = false;
+
   /// Some useful typedefs for creating HdfsFileDesc related data structures.
   /// This is a pair for partition ID and filename which uniquely identifies a file.
   typedef pair<int64_t, std::string> PartitionFileKey;
@@ -506,23 +513,21 @@ class HdfsScanNodeBase : public ScanNode {
   /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
   /// metadata of the scan range that is to be allocated.
   /// This is thread safe.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts,
-      const io::ScanRange* original_split = nullptr);
+      const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
 
   /// Same as the first overload, but it takes sub-ranges as well.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       int64_t partition_id, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts,
-      const io::ScanRange* original_split = nullptr);
+      const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
 
   /// Same as above, but it takes both sub-ranges and metadata.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts);
+      const io::BufferOpts& buffer_opts);
 
   /// Adds ranges to be read later by scanners. Must not be called once
   /// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
@@ -781,6 +786,7 @@ class HdfsScanNodeBase : public ScanNode {
   RuntimeProfile::Counter* bytes_read_local_ = nullptr;
   RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
   RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
+  RuntimeProfile::Counter* bytes_read_ec_ = nullptr;
   RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
   RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
   RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 7795a8c09..49f360ed1 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -888,10 +888,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
         // metadata associated with the footer range.
         ScanRange* footer_range;
         if (footer_split != nullptr) {
-          footer_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), footer_size, footer_start,
-              split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(), files[i]->mtime,
+          footer_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+              footer_size, footer_start, split_metadata->partition_id,
+              footer_split->disk_id(), footer_split->expected_local(),
               BufferOpts(footer_split->cache_options()), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
@@ -899,9 +898,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           int cache_options = !scan_node->IsDataCacheDisabled() ?
               BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING;
           footer_range =
-              scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
-                   footer_size, footer_start, split_metadata->partition_id, -1,
-                   expected_local, files[i]->mtime, BufferOpts(cache_options), split);
+              scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+                  footer_size, footer_start, split_metadata->partition_id, -1,
+                  expected_local, BufferOpts(cache_options), split);
         }
         footer_ranges.push_back(footer_range);
       } else {
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index ce04bf178..12185af5b 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -155,8 +155,9 @@ Status HdfsOrcScanner::ScanRangeInputStream::readRandom(
   bool expected_local = split_range->ExpectedLocalRead(offset, length);
   int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
-      metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
-      split_range->disk_id(), expected_local, split_range->mtime(),
+      ScanRange::FileInfo{scanner_->filename(), metadata_range->fs(),
+          split_range->mtime(), split_range->is_erasure_coded()},
+      length, offset, partition_id, split_range->disk_id(), expected_local,
       BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
   unique_ptr<BufferDescriptor> io_buffer;
   Status status;
@@ -255,9 +256,11 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe)
       string msg = Substitute("Invalid read len.");
       return Status(msg);
     }
-    ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
-        filename(), range.length_, range.offset_, partition_id, split_range->disk_id(),
-        col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options()));
+    ScanRange* scan_range = scan_node_->AllocateScanRange(
+        ScanRange::FileInfo{filename(), metadata_range_->fs(), split_range->mtime(),
+            split_range->is_erasure_coded()},
+        range.length_, range.offset_, partition_id, split_range->disk_id(),
+        col_range_local, BufferOpts(split_range->cache_options()));
     RETURN_IF_ERROR(
         context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_));
   }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 64e4f546e..df70ceb2e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1913,10 +1913,8 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
   const int cache_options =
       metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* object_range = scan_node_->AllocateScanRange(
-      metadata_range_->fs(), filename(), size,
-      offset, partition_id,
+      metadata_range_->GetFileInfo(), size, offset, partition_id,
       metadata_range_->disk_id(), metadata_range_->expected_local(),
-      metadata_range_->mtime(),
       BufferOpts::ReadInto(buffer, size, cache_options));
   unique_ptr<BufferDescriptor> io_buffer;
   bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index e46edfa65..9925714ac 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -96,12 +96,10 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
   int cache_options =
       scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* object_range = scanner_->scan_node_->AllocateScanRange(
-      scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
-      scan_range_start, move(sub_ranges), partition_id,
-      scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
-      scanner_->metadata_range_->mtime(),
-      BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
-          cache_options));
+      scanner_->metadata_range_->GetFileInfo(), scan_range_size, scan_range_start,
+      move(sub_ranges), partition_id, scanner_->metadata_range_->disk_id(),
+      scanner_->metadata_range_->expected_local(), BufferOpts::ReadInto(
+          page_index_buffer_.buffer(), page_index_buffer_.Size(), cache_options));
 
   unique_ptr<BufferDescriptor> io_buffer;
   bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index 972b326aa..3caa3ebda 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -87,10 +87,11 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
       static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
   // Determine if the column is completely contained within a local split.
   bool col_range_local = split_range->ExpectedLocalRead(col_start, col_len);
-  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
-      filename(), col_len, col_start, move(sub_ranges),
-      partition_id, split_range->disk_id(),
-      col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options()));
+  ScanRange::FileInfo fi = metadata_range->GetFileInfo();
+  fi.mtime = file_desc.mtime;
+  scan_range_ = parent_->scan_node_->AllocateScanRange(fi,
+      col_len, col_start, move(sub_ranges), partition_id, split_range->disk_id(),
+      col_range_local, BufferOpts(split_range->cache_options()));
   page_headers_read_ = 0;
   dictionary_header_encountered_ = false;
   state_ = State::Initialized;
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a9ac2061d..2e453dc96 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -172,9 +172,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     // Disable HDFS caching as we are reading past the end.
     int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
-        scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
-        scan_range_->disk_id(), expected_local, scan_range_->mtime(),
-        BufferOpts(cache_options));
+        scan_range_->GetFileInfo(), read_past_buffer_size, offset, partition_id,
+        scan_range_->disk_id(), expected_local, BufferOpts(cache_options));
     bool needs_buffers;
     RETURN_IF_ERROR(
         parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc
index f36b21c67..0aae4faaf 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -145,10 +145,9 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           DCHECK_GT(files[i]->file_length, 0);
           ScanRangeMetadata* metadata =
               static_cast<ScanRangeMetadata*>(split->meta_data());
-          ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), files[i]->file_length, 0,
-              metadata->partition_id, split->disk_id(), split->expected_local(),
-              files[i]->mtime, BufferOpts(split->cache_options()));
+          ScanRange* file_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+              files[i]->file_length, 0, metadata->partition_id, split->disk_id(),
+              split->expected_local(), BufferOpts(split->cache_options()));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index afe65e39d..173899edc 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -261,8 +261,8 @@ void DiskIoMgrStress::NewClient(int i) {
     range_len = min(range_len, file_len - assigned_len);
 
     ScanRange* range = client.obj_pool.Add(new ScanRange);
-    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
-        0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
+    range->Reset(ScanRange::FileInfo{files_[client.file_idx].filename.c_str()},
+        range_len, assigned_len, 0, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 52741f0f8..1e5f9240b 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -131,8 +131,8 @@ class DiskIoMgrTest : public testing::Test {
     }
     if (status.ok()) {
       ScanRange* scan_range = pool_.Add(new ScanRange());
-      scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
-          (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME,
+      scan_range->Reset(ScanRange::FileInfo{(*written_range)->file()},
+          (*written_range)->len(), (*written_range)->offset(), 0, false,
           BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, client, scan_range,
           reinterpret_cast<const char*>(data), sizeof(int32_t));
@@ -300,8 +300,8 @@ class DiskIoMgrTest : public testing::Test {
     ScanRange* range = pool->Add(new ScanRange);
     int cache_options =
         is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime,
-        BufferOpts(cache_options), move(sub_ranges), meta_data);
+    range->Reset(ScanRange::FileInfo{file_path, nullptr, mtime}, len, offset, disk_id,
+        true, BufferOpts(cache_options), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -1591,7 +1591,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME,
+    range->Reset(ScanRange::FileInfo{tmp_file}, scan_len, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1638,8 +1638,8 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
     int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime,
-        BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
+    range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime}, data_len, 0,
+        0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
         move(sub_ranges));
     if (fake_cache) {
       SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
@@ -1688,7 +1688,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
         LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
     unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME,
+    range->Reset(ScanRange::FileInfo{tmp_file}, SCAN_LEN, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -2059,8 +2059,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
     auto data = datas.at(i);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, mtime,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
     bool needs_buffers;
@@ -2085,8 +2086,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
     auto data = datas.at(i);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, mtime,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
     bool needs_buffers;
@@ -2169,8 +2171,9 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
   ScanRange* scan_range = tmp_pool.Add(new ScanRange);
   size_t buffer_len = sizeof(int32_t);
   vector<uint8_t> client_buffer(buffer_len);
-  scan_range->Reset(hdfsConnect("default", 0), (*new_range)->file(), (*new_range)->len(),
-      (*new_range)->offset(), 0, false, mtime,
+  scan_range->Reset(
+      ScanRange::FileInfo{(*new_range)->file(), hdfsConnect("default", 0), mtime},
+      (*new_range)->len(), (*new_range)->offset(), 0, false,
       BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
       nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
   bool needs_buffers;
@@ -2588,8 +2591,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
     auto range = ranges.at(0);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, 1000000,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), 1000000},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile());
     bool needs_buffers;
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 0b98441a2..9dfef6aae 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -412,6 +412,9 @@ void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats) {
       scan_range_->reader_->bytes_read_short_circuit_.Add(
           stats->totalShortCircuitBytesRead);
       scan_range_->reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
+      if (scan_range_->is_erasure_coded()) {
+        scan_range_->reader_->bytes_read_ec_.Add(stats->totalBytesRead);
+      }
       if (stats->totalLocalBytesRead != stats->totalBytesRead) {
         num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
       }
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 96d609d93..646908374 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -158,6 +158,7 @@ class RequestContext {
   int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
   int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
   int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
+  int64_t bytes_read_ec() const { return bytes_read_ec_.Load(); }
   int num_remote_ranges() const { return num_remote_ranges_.Load(); }
   int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); }
 
@@ -399,6 +400,9 @@ class RequestContext {
   /// Total number of bytes read from date node cache, updated at end of each range scan
   AtomicInt64 bytes_read_dn_cache_{0};
 
+  /// Total number of erasure-coded bytes read, updated at end of each range scan
+  AtomicInt64 bytes_read_ec_{0};
+
   /// Total number of bytes from remote reads that were expected to be local.
   AtomicInt64 unexpected_remote_bytes_{0};
 
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index ff3970e49..5121d6acc 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -146,6 +146,7 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   int64_t offset() const { return offset_; }
   int64_t len() const { return len_; }
   int disk_id() const { return disk_id_; }
+  bool is_erasure_coded() const { return is_erasure_coded_; }
   RequestType::type request_type() const { return request_type_; }
 
  protected:
@@ -171,6 +172,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   /// Id of disk queue containing byte range.
   int disk_id_;
 
+  /// Whether file is erasure coded.
+  bool is_erasure_coded_;
+
   /// The type of IO request, READ or WRITE.
   RequestType::type request_type_;
 };
@@ -261,11 +265,26 @@ class ScanRange : public RequestRange {
     int64_t length;
   };
 
+  /// Struct for passing file info for constructing ScanRanges. Only contains details
+  /// consistent across all ranges for a given file. Filename is only used for the
+  /// duration of calls accepting FileInfo.
+  struct FileInfo {
+    const char *filename;
+    hdfsFS fs = nullptr;
+    int64_t mtime = ScanRange::INVALID_MTIME;
+    bool is_erasure_coded = false;
+  };
+
   /// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it
   /// with the rest of the input variables.
-  static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+  static ScanRange* AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
       int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-      int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts);
+      int disk_id, bool expected_local, const BufferOpts& buffer_opts);
+
+  /// Get file info for the current scan range.
+  FileInfo GetFileInfo() const {
+    return FileInfo{file_.c_str(), fs_, mtime_, is_erasure_coded_};
+  }
 
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
@@ -285,15 +304,14 @@ class ScanRange : public RequestRange {
   /// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently
   /// it is not generally safe to do so, but some unit tests reuse ranges after
   /// successfully reading to eos.
-  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-      void* meta_data = nullptr, DiskFile* disk_file = nullptr,
-      DiskFile* disk_buffer_file = nullptr);
+  void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr,
+      DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
 
   /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
   /// in advance, as this method will do the merge.
-  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+  void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts,
       std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr,
       DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
 
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1851f3a1b..57d67bbfa 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -466,57 +466,57 @@ ScanRange::~ScanRange() {
   DCHECK(!read_in_flight_);
 }
 
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-    void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
-  Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {}, meta_data,
-      disk_file, disk_buffer_file);
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+    bool expected_local, const BufferOpts& buffer_opts, void* meta_data,
+    DiskFile* disk_file, DiskFile* disk_buffer_file) {
+  Reset(fi, len, offset, disk_id, expected_local, buffer_opts, {},
+      meta_data, disk_file, disk_buffer_file);
 }
 
-ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
     int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) {
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts) {
   DCHECK_GE(disk_id, -1);
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
   disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
-      file, disk_id, expected_local, /* check_default_fs */ true);
+      fi.filename, disk_id, expected_local, /* check_default_fs */ true);
   ScanRange* range = obj_pool->Add(new ScanRange);
-  range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
+  range->Reset(fi, len, offset, disk_id, expected_local, buffer_opts,
       move(sub_ranges), metadata);
   return range;
 }
 
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-    vector<SubRange>&& sub_ranges, void* meta_data, DiskFile* disk_file,
-    DiskFile* disk_buffer_file) {
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+    bool expected_local, const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges,
+    void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
   DCHECK(buffer_manager_->is_readybuffer_empty());
   DCHECK(!read_in_flight_);
-  DCHECK(file != nullptr);
+  DCHECK(fi.filename != nullptr);
   DCHECK_GE(len, 0);
   DCHECK_GE(offset, 0);
   DCHECK(buffer_opts.client_buffer_ == nullptr ||
          buffer_opts.client_buffer_len_ >= len_);
-  fs_ = fs;
-  if (fs != nullptr) {
+  fs_ = fi.fs;
+  if (fs_ != nullptr) {
     file_reader_ = make_unique<HdfsFileReader>(this, fs_, false);
     local_buffer_reader_ = make_unique<LocalFileReader>(this);
   } else {
     file_reader_ = make_unique<LocalFileReader>(this);
   }
-  file_ = file;
+  file_ = fi.filename;
   len_ = len;
   bytes_to_read_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
+  is_erasure_coded_ = fi.is_erasure_coded;
   cache_options_ = buffer_opts.cache_options_;
   disk_file_ = disk_file;
   disk_buffer_file_ = disk_buffer_file;
 
   // HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
-  if (fs_) DCHECK_GT(mtime, 0);
-  mtime_ = mtime;
+  mtime_ = fi.mtime;
+  if (fs_) DCHECK_GT(mtime_, 0);
   meta_data_ = meta_data;
   if (buffer_opts.client_buffer_ != nullptr) {
     buffer_manager_->set_client_buffer();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index be84ca420..8a90cf614 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -1629,16 +1629,18 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
     DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset);
     DiskFile* remote_file = tmp_file->DiskFile();
     // Reset the read_range, use the remote filesystem's disk id.
-    handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(),
-        handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_,
+    handle->read_range_->Reset(
+        ScanRange::FileInfo{
+            remote_file->path().c_str(), tmp_file->hdfs_conn_, tmp_file->mtime_},
+        handle->write_range_->len(), offset, tmp_file->disk_id(), false,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING),
         nullptr, remote_file, local_read_buffer_file);
   } else {
     // Read from local.
-    handle->read_range_->Reset(nullptr, handle->write_range_->file(),
+    handle->read_range_->Reset(
+        ScanRange::FileInfo{handle->write_range_->file()},
         handle->write_range_->len(), offset, handle->write_range_->disk_id(), false,
-        ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5afaf73c6..67ea5414c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -134,6 +134,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
       hdfs_scan_range.__set_offset(scan_range_offset);
       hdfs_scan_range.__set_partition_id(spec.partition_id);
       hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
+      hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
       if (fb_desc->absolute_path() != nullptr) {
         hdfs_scan_range.__set_absolute_path(fb_desc->absolute_path()->str());
       }
@@ -1123,6 +1124,7 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra
     hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
     hdfs_file_split->set_partition_path_hash(
         tscan_range.hdfs_file_split.partition_path_hash);
+    hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
     if (tscan_range.hdfs_file_split.__isset.absolute_path) {
       hdfs_file_split->set_absolute_path(
           tscan_range.hdfs_file_split.absolute_path);
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index e57062fcc..6a2e07402 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -59,6 +59,8 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ =
     "impala-server.io-mgr.short-circuit-bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ =
     "impala-server.io-mgr.cached-bytes-read";
+const char* ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ =
+    "impala-server.io-mgr.erasure-coded-bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES =
     "impala-server.io-mgr.remote-data-cache-hit-bytes";
 const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT =
@@ -167,6 +169,7 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = nullptr;
@@ -341,6 +344,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
   IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
+  IO_MGR_ERASURE_CODED_BYTES_READ = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ, 0);
   IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
   IO_MGR_BYTES_WRITTEN = IO_MGR_METRICS->AddCounter(
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index a3811f7b3..c43dca69f 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -76,6 +76,9 @@ class ImpaladMetricKeys {
   /// Total number of cached bytes read by the io mgr
   static const char* IO_MGR_CACHED_BYTES_READ;
 
+  /// Total number of erasure-coded bytes read by the io mgr
+  static const char* IO_MGR_ERASURE_CODED_BYTES_READ;
+
   /// Total number of bytes read from the remote data cache.
   static const char* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
 
@@ -258,6 +261,7 @@ class ImpaladMetrics {
   static IntCounter* IO_MGR_BYTES_READ;
   static IntCounter* IO_MGR_LOCAL_BYTES_READ;
   static IntCounter* IO_MGR_CACHED_BYTES_READ;
+  static IntCounter* IO_MGR_ERASURE_CODED_BYTES_READ;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 46e8a5a30..d76a25fa6 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -227,6 +227,9 @@ struct THdfsFileSplit {
   // The absolute path of the file, it's used only when data files are outside of
   // the Iceberg table location (IMPALA-11507).
   10: optional string absolute_path
+
+  // Whether the HDFS file is stored with erasure coding.
+  11: optional bool is_erasure_coded
 }
 
 // Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index a5abd6651..6e5bca495 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -409,6 +409,16 @@
     "kind": "COUNTER",
     "key": "impala-server.io-mgr.cached-bytes-read"
   },
+  {
+    "description": "Total number of erasure-coded bytes read by the IO manager.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Erasure Coded Bytes Read",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.erasure-coded-bytes-read"
+  },
   {
     "description": "Total number of local bytes read by the IO manager.",
     "contexts": [
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 1958595ad..35cdddc4c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1424,6 +1424,7 @@ public class HdfsScanNode extends ScanNode {
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
             partition.getLocation().hashCode());
         hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath());
+        hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
         scanRange.setHdfs_file_split(hdfsFileSplit);
         if (fileDesc.getFbFileMetadata() != null) {
           scanRange.setFile_metadata(fileDesc.getFbFileMetadata().getByteBuffer());
diff --git a/tests/query_test/test_io_metrics.py b/tests/query_test/test_io_metrics.py
new file mode 100644
index 000000000..f4e4d1ef4
--- /dev/null
+++ b/tests/query_test/test_io_metrics.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.util.filesystem_utils import IS_EC, IS_HDFS
+
+
+class TestIOMetrics(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIOMetrics, cls).add_test_dimensions()
+    # Run with num_nodes=1 to make it easy to verify metric changes.
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension(num_nodes=1))
+
+  # Issue a local query and test that read metrics are updated.
+  @pytest.mark.execute_serially
+  def test_local_read(self, vector):
+    # Accumulate metrics that are expected to update from a read, and metrics that are
+    # expected not to change with this configuration. Metrics that shouldn't change for
+    # this test should be 0 throughout the whole test suite so we can just verify they're
+    # 0 after running our query. Omits cached-bytes-read because it has its own test.
+    expect_nonzero_metrics = ["impala-server.io-mgr.bytes-read"]
+    expect_zero_metrics = []
+
+    def append_metric(metric, expect_nonzero):
+      (expect_nonzero_metrics if expect_nonzero else expect_zero_metrics).append(metric)
+
+    append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC)
+    append_metric("impala-server.io-mgr.short-circuit-bytes-read",
+        IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+    # TODO: this should be updated for Ozone, but the code that updates it is guarded by
+    #       IsHdfsPath and adding Ozone causes a crash. Plan to debug in IMPALA-11697.
+    append_metric("impala-server.io-mgr.local-bytes-read",
+        IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+
+    nonzero_before = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+    result = self.execute_query("select count(*) from tpch.nation")
+    assert(len(result.data) == 1)
+    assert(result.data[0] == '25')
+    nation_data_file_length = 2199
+
+    nonzero_after = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+    zero_values = self.impalad_test_service.get_metric_values(expect_zero_metrics)
+    assert(len(expect_zero_metrics) == len(zero_values))
+    LOG.info("Verifying %s expect-zero metrics.", len(expect_zero_metrics))
+    for metric, value in zip(expect_zero_metrics, zero_values):
+      LOG.info("%s: %s", metric, value)
+      assert(value == 0)
+
+    assert(len(expect_nonzero_metrics) == len(nonzero_before) == len(nonzero_after))
+    LOG.info("Verifying %s expect-non-zero metrics.", len(expect_nonzero_metrics))
+    for metric, before, after in \
+        zip(expect_nonzero_metrics, nonzero_before, nonzero_after):
+      LOG.info("%s: %s -> %s", metric, before, after)
+      assert(before + nation_data_file_length == after)


[impala] 02/02: IMPALA-11692: Struct slot memory sharing involving select * not working properly

Posted by db...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit efa426453a8af3728bc272b9158f5564ce37e0ea
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Fri Oct 28 16:59:20 2022 +0200

    IMPALA-11692: Struct slot memory sharing involving select * not working properly
    
    With EXPAND_COMPLEX_TYPES=1, if there are structs coming from the star
    expansion and members of the structs are also given explicitly, slot
    memory sharing does not work in some cases:
    
    explain select * from functional_orc_def.complextypes_nested_structs;
    row-size=64B
    
    explain select *, outer_struct.inner_struct1 from
    functional_orc_def.complextypes_nested_structs;
    row-size=80B
    
    The row size should be the same in both cases as
    outer_struct.inner_struct1 is part of outer_struct which is included in
    the star.
    
    This change modifies how star select list items are analysed. First,
    before 'SelectAnalyzer.analyzeSelectClause()' is called, all star items
    are expanded to paths which are stored in the 'starExpandedPaths_' map.
    The function 'SelectAnalyzer.registerStructSlotRefPathsWithAnalyzer()',
    which makes struct slot memory sharing possible, takes these star
    expanded paths into account. Then in
    'SelectAnalyzer.analyzeSelectClause()' the paths expanded from the star
    items are retrieved and and normal analysis takes place.
    
    Testing:
     - Added the test function
       PlannerTest.testStructFieldSlotSharedWithStructFromStarExpansion()
       that verifies that struct slot memory sharing takes place.
    
    Change-Id: I346c2808c1aa5e77e3cdf3593f7f48ac96516c00
    Reviewed-on: http://gerrit.cloudera.org:8080/19190
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/SelectStmt.java     | 269 ++++++++++++++-------
 .../org/apache/impala/planner/PlannerTest.java     |  34 +++
 2 files changed, 222 insertions(+), 81 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index b75b86da9..19db3caa0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -21,8 +21,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -283,6 +284,41 @@ public class SelectStmt extends QueryStmt {
     private List<FunctionCallExpr> aggExprs_;
     private ExprSubstitutionMap countAllMap_;
 
+    private class StarExpandedPathInfo {
+      public StarExpandedPathInfo(Path expandedPath, Path originalRootPath) {
+        expandedPath_ = expandedPath;
+        originalRootPath_ = originalRootPath;
+      }
+
+      public Path getExpandedPath() { return expandedPath_; }
+      public boolean shouldRegisterForColumnMasking() {
+        // Empty matched types means this is expanded from star of a catalog table. For
+        // star of complex types, e.g. my_struct.*, my_array.*, my_map.*, the matched
+        // types will have the complex type so it's not empty.
+        // TODO: IMPALA-11712: We should sort out column masking and complex types. The
+        // above comment may not always be true: in the query
+        //   select a.* from mix_struct_array t, t.struct_in_arr a;
+        // getMatchedTypes() returns an empty list for the star path even though it is not
+        // from a catalog table.
+        // We should also find out whether we can determine from the expanded path alone
+        // (and not from the path of the star item) whether we need to register it for
+        // column masking, for example by checking if it is within a complex type.
+        return originalRootPath_.getMatchedTypes().isEmpty();
+      }
+
+      // The path expanded from a star select list item.
+      private final Path expandedPath_;
+
+      // The original path of the star select list item from which 'expandedPath_' was
+      // expanded.
+      // Can be the path of a table, a struct or a collection.
+      private final Path originalRootPath_;
+    }
+
+    // A map from star 'SelectListItem's to the paths to which they are expanded.
+    private final Map<SelectListItem, List<StarExpandedPathInfo>> starExpandedPaths_
+        = new HashMap<>();
+
     private SelectAnalyzer(Analyzer analyzer) {
       this.analyzer_ = analyzer;
     }
@@ -291,6 +327,10 @@ public class SelectStmt extends QueryStmt {
       // Start out with table refs to establish aliases.
       fromClause_.analyze(analyzer_);
 
+      // Register struct paths (including those expanded from star expressions) before
+      // analyzeSelectClause() to guarantee tuple memory sharing between structs and
+      // struct members. See registerStructSlotRefPathsWithAnalyzer().
+      collectStarExpandedPaths();
       registerStructSlotRefPathsWithAnalyzer();
 
       analyzeSelectClause();
@@ -333,14 +373,43 @@ public class SelectStmt extends QueryStmt {
       analyzer_.setSimpleLimitStatus(checkSimpleLimitStmt());
     }
 
-    /* Register paths that point to structs. This is to unify expressions and allow
-     * embedded (struct member) expressions to share the tuple memory of their enclosing
-     * expressions.
+    /** Ensure that embedded (struct member) expressions share the tuple memory of their
+     * enclosing struct expressions by registering struct paths before analysis of select
+     * list items. Struct paths are registered in order of increasing number of path
+     * elements - this guarantees that when a struct member path is registered, its
+     * enclosing struct has already been registered, so Analyzer.registerSlotRef() can
+     * return the SlotDescriptor already created for the struct member within the struct,
+     * instead of creating a new SlotDescriptor for the struct member outside of the
+     * struct.
+     *
+     * Note that struct members can themselves be structs: this is the reason that
+     * ordering by increasing path element number (increasing embedding depth) is
+     * necessary and simply registering struct paths before other paths is not enough.
      */
     private void registerStructSlotRefPathsWithAnalyzer() throws AnalysisException {
+      Stream<Path> nonStarPaths = collectNonStarPaths();
+      Stream<Path> starExpandedPaths = starExpandedPaths_.values().stream()
+          // Get a flat list of paths (and booleans) belonging to all star items.
+          .flatMap(pathList -> pathList.stream())
+          // Discard the booleans and keep only the actual paths.
+          .map((StarExpandedPathInfo pathInfo)  -> pathInfo.getExpandedPath());
+
+      Stream<Path> allPaths = Stream.concat(nonStarPaths, starExpandedPaths);
+      List<Path> structPaths = allPaths
+          .filter(path -> path.destType().isStructType())
+          .collect(Collectors.toList());
+
+      // Sort paths by length in ascending order so that structs that contain other
+      // structs come before their children.
+      Collections.sort(structPaths,
+          Comparator.<Path>comparingInt(path -> path.getMatchedTypes().size()));
+      for (Path p : structPaths) {
+        analyzer_.registerSlotRef(p);
+      }
+    }
+
+    private Stream<Path> collectNonStarPaths() {
       Preconditions.checkNotNull(selectList_);
-      // Note: if we in the future include complex types in star expressions, we will have
-      // to expand star expressions here.
       Stream<Expr> selectListExprs = selectList_.getItems().stream()
         .filter(elem -> !elem.isStar())
         .map(elem -> elem.getExpr());
@@ -348,20 +417,13 @@ public class SelectStmt extends QueryStmt {
 
       Stream<Expr> exprs = Stream.concat(selectListExprs, nonSelectListExprs);
 
-      HashSet<SlotRef> slotRefs = new HashSet<>();
+      // Use a LinkedHashSet for deterministic iteration order.
+      LinkedHashSet<SlotRef> slotRefs = new LinkedHashSet<>();
       exprs.forEach(expr -> expr.collect(SlotRef.class, slotRefs));
 
-      List<Path> paths = slotRefs.stream().map(this::slotRefToResolvedPath)
-          .filter(path -> path != null)
-          .filter(path -> path.destType().isStructType())
-          .collect(Collectors.toList());
-      // Sort paths by length in ascending order so that structs that contain other
-      // structs come before their children.
-      Collections.sort(paths,
-          Comparator.<Path>comparingInt(path -> path.getMatchedTypes().size()));
-      for (Path p : paths) {
-        analyzer_.registerSlotRef(p);
-      }
+      return slotRefs.stream()
+          .map(this::slotRefToResolvedPath)
+          .filter(path -> path != null);
     }
 
     private Stream<Expr> collectExprsOutsideSelectList() {
@@ -419,48 +481,9 @@ public class SelectStmt extends QueryStmt {
       for (int i = 0; i < selectList_.getItems().size(); ++i) {
         SelectListItem item = selectList_.getItems().get(i);
         if (item.isStar()) {
-          if (item.getRawPath() != null) {
-            Path resolvedPath = analyzeStarPath(item.getRawPath(), analyzer_);
-            expandStar(resolvedPath);
-          } else {
-            expandStar();
-          }
+          analyzeStarItem(item);
         } else {
-          // Analyze the resultExpr before generating a label to ensure enforcement
-          // of expr child and depth limits (toColumn() label may call toSql()).
-          item.getExpr().analyze(analyzer_);
-          // Check for scalar subquery types which are not supported
-          List<Subquery> subqueryExprs = new ArrayList<>();
-          item.getExpr().collect(Subquery.class, subqueryExprs);
-          for (Subquery s : subqueryExprs) {
-            Preconditions.checkState(s.getStatement() instanceof SelectStmt);
-            if (!s.returnsScalarColumn()) {
-              throw new AnalysisException("A non-scalar subquery is not supported in "
-                  + "the expression: " + item.getExpr().toSql());
-            }
-            if (s.getStatement().isRuntimeScalar()) {
-              throw new AnalysisException(
-                  "A subquery which may return more than one row is not supported in "
-                  + "the expression: " + item.getExpr().toSql());
-            }
-            if (!((SelectStmt)s.getStatement()).returnsAtMostOneRow()) {
-              throw new AnalysisException("Only subqueries that are guaranteed to return "
-                   + "a single row are supported: " + item.getExpr().toSql());
-            }
-          }
-          resultExprs_.add(item.getExpr());
-          String label = item.toColumnLabel(i, analyzer_.useHiveColLabels());
-          SlotRef aliasRef = new SlotRef(label);
-          Expr existingAliasExpr = existingAliasExprs.get(label);
-          if (existingAliasExpr != null && !existingAliasExpr.equals(item.getExpr())) {
-            // If we have already seen this alias, it refers to more than one column and
-            // therefore is ambiguous.
-            ambiguousAliasList_.add(aliasRef);
-          } else {
-            existingAliasExprs.put(label, item.getExpr());
-          }
-          aliasSmap_.put(aliasRef, item.getExpr().clone());
-          colLabels_.add(label);
+          analyzeNonStarItem(item, existingAliasExprs, i);
         }
       }
       if (LOG.isTraceEnabled()) {
@@ -468,6 +491,62 @@ public class SelectStmt extends QueryStmt {
       }
     }
 
+    private void analyzeStarItem(SelectListItem item) throws AnalysisException {
+      Preconditions.checkState(item.isStar());
+      List<StarExpandedPathInfo> starExpandedPathInfos = starExpandedPaths_.get(item);
+      // If complex types are not expanded, a star item may expand to zero items, in which
+      // case starExpandedPaths_ does not have a value for it.
+      if (starExpandedPathInfos == null) {
+        Preconditions.checkState(
+            !analyzer_.getQueryCtx().client_request.query_options.expand_complex_types);
+        return;
+      }
+
+      for (StarExpandedPathInfo pathInfo : starExpandedPathInfos) {
+        addStarExpandedPathResultExpr(pathInfo);
+      }
+    }
+
+    private void analyzeNonStarItem(SelectListItem item,
+        Map<String, Expr> existingAliasExprs, int selectListPos)
+        throws AnalysisException {
+      // Analyze the resultExpr before generating a label to ensure enforcement
+      // of expr child and depth limits (toColumn() label may call toSql()).
+      item.getExpr().analyze(analyzer_);
+      // Check for scalar subquery types which are not supported
+      List<Subquery> subqueryExprs = new ArrayList<>();
+      item.getExpr().collect(Subquery.class, subqueryExprs);
+      for (Subquery s : subqueryExprs) {
+        Preconditions.checkState(s.getStatement() instanceof SelectStmt);
+        if (!s.returnsScalarColumn()) {
+          throw new AnalysisException("A non-scalar subquery is not supported in "
+              + "the expression: " + item.getExpr().toSql());
+        }
+        if (s.getStatement().isRuntimeScalar()) {
+          throw new AnalysisException(
+              "A subquery which may return more than one row is not supported in "
+              + "the expression: " + item.getExpr().toSql());
+        }
+        if (!((SelectStmt)s.getStatement()).returnsAtMostOneRow()) {
+          throw new AnalysisException("Only subqueries that are guaranteed to return "
+              + "a single row are supported: " + item.getExpr().toSql());
+        }
+      }
+      resultExprs_.add(item.getExpr());
+      String label = item.toColumnLabel(selectListPos, analyzer_.useHiveColLabels());
+      SlotRef aliasRef = new SlotRef(label);
+      Expr existingAliasExpr = existingAliasExprs.get(label);
+      if (existingAliasExpr != null && !existingAliasExpr.equals(item.getExpr())) {
+        // If we have already seen this alias, it refers to more than one column and
+        // therefore is ambiguous.
+        ambiguousAliasList_.add(aliasRef);
+      } else {
+        existingAliasExprs.put(label, item.getExpr());
+      }
+      aliasSmap_.put(aliasRef, item.getExpr().clone());
+      colLabels_.add(label);
+    }
+
     private void verifyResultExprs() throws AnalysisException {
       // Star exprs only expand to the scalar-typed columns/fields, so
       // the resultExprs_ could be empty.
@@ -718,7 +797,7 @@ public class SelectStmt extends QueryStmt {
      * complex-typed fields for backwards compatibility unless EXPAND_COMPLEX_TYPES is set
      * to true.
      */
-    private void expandStar() throws AnalysisException {
+    private void expandStar(SelectListItem selectListItem) throws AnalysisException {
       if (fromClause_.isEmpty()) {
         throw new AnalysisException(
             "'*' expression in select list requires FROM clause.");
@@ -730,7 +809,7 @@ public class SelectStmt extends QueryStmt {
         Path resolvedPath = new Path(tableRef.getDesc(),
             Collections.<String>emptyList());
         Preconditions.checkState(resolvedPath.resolve());
-        expandStar(resolvedPath);
+        expandStar(selectListItem, resolvedPath);
       }
     }
 
@@ -738,7 +817,7 @@ public class SelectStmt extends QueryStmt {
      * Expand "path.*" from a resolved path, ignoring complex-typed fields for backwards
      * compatibility unless EXPAND_COMPLEX_TYPES is set to true.
      */
-    private void expandStar(Path resolvedPath)
+    private void expandStar(SelectListItem selectListItem, Path resolvedPath)
         throws AnalysisException {
       Preconditions.checkState(resolvedPath.isResolved());
       if (resolvedPath.destTupleDesc() != null &&
@@ -749,7 +828,7 @@ public class SelectStmt extends QueryStmt {
         TupleDescriptor tupleDesc = resolvedPath.destTupleDesc();
         FeTable table = tupleDesc.getTable();
         for (Column c: table.getColumnsInHiveOrder()) {
-          addStarResultExpr(resolvedPath, c.getName());
+          addStarExpandedPath(selectListItem, resolvedPath, c.getName());
         }
       } else {
         // The resolved path does not target the descriptor of a catalog table.
@@ -767,54 +846,82 @@ public class SelectStmt extends QueryStmt {
         if (structType instanceof CollectionStructType) {
           CollectionStructType cst = (CollectionStructType) structType;
           if (cst.isMapStruct()) {
-            addStarResultExpr(resolvedPath, Path.MAP_KEY_FIELD_NAME);
+            addStarExpandedPath(selectListItem, resolvedPath, Path.MAP_KEY_FIELD_NAME);
           }
           if (cst.getOptionalField().getType().isStructType()) {
             structType = (StructType) cst.getOptionalField().getType();
             for (StructField f: structType.getFields()) {
-              addStarResultExpr(
-                  resolvedPath, cst.getOptionalField().getName(), f.getName());
+              addStarExpandedPath(selectListItem, resolvedPath,
+                  cst.getOptionalField().getName(), f.getName());
             }
           } else if (cst.isMapStruct()) {
-            addStarResultExpr(resolvedPath, Path.MAP_VALUE_FIELD_NAME);
+            addStarExpandedPath(selectListItem, resolvedPath, Path.MAP_VALUE_FIELD_NAME);
           } else {
-            addStarResultExpr(resolvedPath, Path.ARRAY_ITEM_FIELD_NAME);
+            addStarExpandedPath(selectListItem, resolvedPath, Path.ARRAY_ITEM_FIELD_NAME);
           }
         } else {
           // Default star expansion.
           for (StructField f: structType.getFields()) {
             if (f.isHidden()) continue;
-            addStarResultExpr(resolvedPath, f.getName());
+            addStarExpandedPath(selectListItem, resolvedPath, f.getName());
           }
         }
       }
     }
 
     /**
-     * Helper function used during star expansion to add a single result expr
-     * based on a given raw path to be resolved relative to an existing path.
+     * Expand star items to paths and store them in 'starExpandedPaths_'.
      */
-    private void addStarResultExpr(Path resolvedPath,
+    private void collectStarExpandedPaths() throws AnalysisException {
+      for (SelectListItem item : selectList_.getItems()) {
+        if (item.isStar()) {
+          if (item.getRawPath() != null) {
+            Path resolvedPath = analyzeStarPath(item.getRawPath(), analyzer_);
+            expandStar(item, resolvedPath);
+          } else {
+            expandStar(item);
+          }
+        }
+      }
+    }
+
+    /**
+     * Helper function used during star expansion to add a single expanded path based on a
+     * given raw path to be resolved relative to an existing path.
+     */
+    private void addStarExpandedPath(SelectListItem selectListItem, Path resolvedRootPath,
         String... relRawPath) throws AnalysisException {
-      Path p = Path.createRelPath(resolvedPath, relRawPath);
-      Preconditions.checkState(p.resolve());
-      if (p.destType().isComplexType() &&
+      Path starExpandedPath = Path.createRelPath(resolvedRootPath, relRawPath);
+      Preconditions.checkState(starExpandedPath.resolve());
+      if (starExpandedPath.destType().isComplexType() &&
           !analyzer_.getQueryCtx().client_request.query_options.expand_complex_types) {
         return;
       }
-      SlotDescriptor slotDesc = analyzer_.registerSlotRef(p, false);
+
+      if (!starExpandedPaths_.containsKey(selectListItem)) {
+        starExpandedPaths_.put(selectListItem, new ArrayList<>());
+      }
+      List<StarExpandedPathInfo> pathsOfStarItem = starExpandedPaths_.get(selectListItem);
+      pathsOfStarItem.add(new StarExpandedPathInfo(starExpandedPath, resolvedRootPath));
+    }
+
+    private void addStarExpandedPathResultExpr(StarExpandedPathInfo starExpandedPathInfo)
+        throws AnalysisException {
+      Preconditions.checkState(starExpandedPathInfo.getExpandedPath().isResolved());
+
+      SlotDescriptor slotDesc = analyzer_.registerSlotRef(
+          starExpandedPathInfo.getExpandedPath(), false);
       SlotRef slotRef = new SlotRef(slotDesc);
       Preconditions.checkState(slotRef.isAnalyzed(),
           "Analysis should be done in constructor");
 
-      // Empty matched types means this is expanded from star of a catalog table.
-      // For star of complex types, e.g. my_struct.*, my_array.*, my_map.*, the matched
-      // types will have the complex type so it's not empty.
-      if (resolvedPath.getMatchedTypes().isEmpty()) {
+      if (starExpandedPathInfo.shouldRegisterForColumnMasking()) {
         analyzer_.registerColumnForMasking(slotDesc);
       }
       resultExprs_.add(slotRef);
-      colLabels_.add(relRawPath[relRawPath.length - 1]);
+      final List<String> starExpandedRawPath = starExpandedPathInfo
+        .getExpandedPath().getRawPath();
+      colLabels_.add(starExpandedRawPath.get(starExpandedRawPath.size() - 1));
     }
 
     /**
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 06080b1e4..5f2383e48 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -857,6 +857,7 @@ public class PlannerTest extends PlannerTestBase {
     // struct is queried.
 
     // For complex types in the select list, we have to turn codegen off.
+    // TODO: Remove this when IMPALA-10851 is fixed.
     TQueryOptions queryOpts = defaultQueryOptions();
     queryOpts.setDisable_codegen(true);
 
@@ -880,6 +881,39 @@ public class PlannerTest extends PlannerTestBase {
     }
   }
 
+  @Test
+  public void testStructFieldSlotSharedWithStructFromStarExpansion()
+      throws ImpalaException {
+    // Like testStructFieldSlotSharedWithStruct(), but involving structs that come from a
+    // star expansion.
+
+    TQueryOptions queryOpts = defaultQueryOptions();
+    // For complex types in the select list, we have to turn codegen off.
+    // TODO: Remove this when IMPALA-10851 is fixed.
+    queryOpts.setDisable_codegen(true);
+    // Enable star-expandion of complex types.
+    queryOpts.setExpand_complex_types(true);
+
+    String queryTemplate =
+        "select %s from functional_orc_def.complextypes_nested_structs";
+
+    // The base case is when only the star is given in the select list.
+    String queryWithoutFields =
+        String.format(queryTemplate, "*");
+    int rowSizeWithoutFields = getRowSize(queryWithoutFields, queryOpts);
+
+    // Try permutations of (nested) fields of the top-level struct.
+    String[] fields = {"*", "outer_struct", "outer_struct.inner_struct1",
+      "outer_struct.inner_struct1.str"};
+    Collection<List<String>> permutations =
+      Collections2.permutations(java.util.Arrays.asList(fields));
+    for (List<String> permutation : permutations) {
+      String query = String.format(queryTemplate, String.join(", ", permutation));
+      int rowSize = getRowSize(query, queryOpts);
+      Assert.assertEquals(rowSizeWithoutFields, rowSize);
+    }
+  }
+
   @Test
   public void testResourceRequirements() {
     // Tests the resource requirement computation from the planner.