You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2022/11/10 00:36:55 UTC

[impala] 01/02: IMPALA-9488: Add metrics for EC reads

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4e87a80fae86c5775a5f35607806fee287b3797a
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Fri Oct 28 10:57:48 2022 -0700

    IMPALA-9488: Add metrics for EC reads
    
    Adds metric tracking erasure-coded bytes read. Adds ScanRange::TestInfo
    to pass file info through calls of AllocateScanRange so it's easier to
    add erasure coding as another piece of file info.
    
    Adds a test to verify that the expected number of bytes are read for
    existing read metrics and the new `erasure-coded-bytes-read` metric when
    doing a select.
    
    Change-Id: Ieb06bac9dea4b632621653d2935e9a7b2dc81341
    Reviewed-on: http://gerrit.cloudera.org:8080/19178
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc               |  7 +-
 be/src/exec/hdfs-scan-node-base.cc                 | 42 +++++++-----
 be/src/exec/hdfs-scan-node-base.h                  | 22 +++---
 be/src/exec/hdfs-scanner.cc                        | 13 ++--
 be/src/exec/orc/hdfs-orc-scanner.cc                | 13 ++--
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  4 +-
 be/src/exec/parquet/parquet-page-index.cc          | 10 ++-
 be/src/exec/parquet/parquet-page-reader.cc         |  9 +--
 be/src/exec/scanner-context.cc                     |  5 +-
 be/src/exec/text/hdfs-text-scanner.cc              |  7 +-
 be/src/runtime/io/disk-io-mgr-stress.cc            |  4 +-
 be/src/runtime/io/disk-io-mgr-test.cc              | 36 +++++-----
 be/src/runtime/io/hdfs-file-reader.cc              |  3 +
 be/src/runtime/io/request-context.h                |  4 ++
 be/src/runtime/io/request-ranges.h                 | 34 +++++++---
 be/src/runtime/io/scan-range.cc                    | 38 +++++------
 be/src/runtime/tmp-file-mgr.cc                     | 10 +--
 be/src/scheduling/scheduler.cc                     |  2 +
 be/src/util/impalad-metrics.cc                     |  5 ++
 be/src/util/impalad-metrics.h                      |  4 ++
 common/thrift/PlanNodes.thrift                     |  3 +
 common/thrift/metrics.json                         | 10 +++
 .../org/apache/impala/planner/HdfsScanNode.java    |  1 +
 tests/query_test/test_io_metrics.py                | 79 ++++++++++++++++++++++
 24 files changed, 254 insertions(+), 111 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index ae66d0e01..83e1221b1 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -62,10 +62,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     bool expected_local = false;
     int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE :
         BufferOpts::NO_CACHING;
-    ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
-        files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
-        expected_local, files[i]->mtime, BufferOpts(cache_options),
-        metadata->original_split);
+    ScanRange* header_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+        header_size, 0, metadata->partition_id, -1, expected_local,
+        BufferOpts(cache_options), metadata->original_split);
     ScanRangeMetadata* header_metadata =
             static_cast<ScanRangeMetadata*>(header_range->meta_data());
     header_metadata->is_sequence_header = true;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b5bab2c13..f6f1f008d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -111,6 +111,8 @@ PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES,
     "The total number of bytes read via short circuit read");
 PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES,
     "The total number of bytes read from data node cache");
+PROFILE_DEFINE_COUNTER(BytesReadErasureCoded, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes read from erasure-coded data");
 PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT,
     "The total number of remote scan ranges");
 PROFILE_DEFINE_COUNTER(BytesReadRemoteUnexpected, STABLE_LOW, TUnit::BYTES,
@@ -298,6 +300,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
         file_desc->file_length = split.file_length();
         file_desc->mtime = split.mtime();
         file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
+        file_desc->is_erasure_coded = split.is_erasure_coded();
         file_desc->file_format = partition_desc->file_format();
         file_desc->file_metadata = file_metadata;
         RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
@@ -328,10 +331,9 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
       }
       ScanRangeMetadata* metadata =
           obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
-      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
-          file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
-          params.volume_id(), expected_local, file_desc->mtime,
-          BufferOpts(cache_options)));
+      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool,
+          file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata,
+          params.volume_id(), expected_local, BufferOpts(cache_options)));
       total_splits++;
     }
     // Update server wide metrics for number of scan ranges and ranges that have
@@ -608,6 +610,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   bytes_read_short_circuit_ =
       PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile());
   bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile());
+  bytes_read_ec_ = PROFILE_BytesReadErasureCoded.Instantiate(runtime_profile());
   num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile());
   unexpected_remote_bytes_ =
       PROFILE_BytesReadRemoteUnexpected.Instantiate(runtime_profile());
@@ -822,37 +825,37 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
   return curr_reservation;
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-    int64_t mtime,  const BufferOpts& buffer_opts, const ScanRange* original_split) {
+    const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
-  return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
-      mtime, buffer_opts);
+  return AllocateScanRange(fi, len, offset, {}, metadata, disk_id, expected_local,
+      buffer_opts);
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
     int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
-    int64_t partition_id, int disk_id, bool expected_local, int64_t mtime,
+    int64_t partition_id, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
-  return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
-      disk_id, expected_local, mtime, buffer_opts);
+  return AllocateScanRange(fi, len, offset, move(sub_ranges), metadata,
+      disk_id, expected_local, buffer_opts);
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
-    int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
-    int disk_id, bool expected_local, int64_t mtime,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
+    int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
+    ScanRangeMetadata* metadata, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts) {
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
   // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
   // beyond the end of the file).
-  DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
+  DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, fi.filename)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
-  return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
-      move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts);
+  return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fi, len, offset,
+      move(sub_ranges), metadata, disk_id, expected_local, buffer_opts);
 }
 
 const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
@@ -1213,6 +1216,7 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
     bytes_read_local_->Set(reader_context_->bytes_read_local());
     bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
     bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+    bytes_read_ec_->Set(reader_context_->bytes_read_ec());
     num_remote_ranges_->Set(reader_context_->num_remote_ranges());
     unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
     cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
@@ -1237,6 +1241,8 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
         bytes_read_short_circuit_->value());
     ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
         bytes_read_dn_cache_->value());
+    ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ->Increment(
+        bytes_read_ec_->value());
   }
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 21d18f25c..053e61c70 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -71,6 +71,10 @@ struct HdfsFileDesc {
       file_compression(THdfsCompression::NONE),
       file_format(THdfsFileFormat::TEXT) {}
 
+  io::ScanRange::FileInfo GetFileInfo() const {
+    return io::ScanRange::FileInfo{filename.c_str(), fs, mtime, is_erasure_coded};
+  }
+
   /// Connection to the filesystem containing the file.
   hdfsFS fs;
 
@@ -93,6 +97,9 @@ struct HdfsFileDesc {
   /// Extra file metadata, e.g. Iceberg-related file-level info.
   const ::org::apache::impala::fb::FbFileMetadata* file_metadata;
 
+  /// Whether file is erasure coded.
+  bool is_erasure_coded = false;
+
   /// Some useful typedefs for creating HdfsFileDesc related data structures.
   /// This is a pair for partition ID and filename which uniquely identifies a file.
   typedef pair<int64_t, std::string> PartitionFileKey;
@@ -506,23 +513,21 @@ class HdfsScanNodeBase : public ScanNode {
   /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
   /// metadata of the scan range that is to be allocated.
   /// This is thread safe.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts,
-      const io::ScanRange* original_split = nullptr);
+      const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
 
   /// Same as the first overload, but it takes sub-ranges as well.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       int64_t partition_id, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts,
-      const io::ScanRange* original_split = nullptr);
+      const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
 
   /// Same as above, but it takes both sub-ranges and metadata.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+  io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-      int64_t mtime, const io::BufferOpts& buffer_opts);
+      const io::BufferOpts& buffer_opts);
 
   /// Adds ranges to be read later by scanners. Must not be called once
   /// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
@@ -781,6 +786,7 @@ class HdfsScanNodeBase : public ScanNode {
   RuntimeProfile::Counter* bytes_read_local_ = nullptr;
   RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
   RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
+  RuntimeProfile::Counter* bytes_read_ec_ = nullptr;
   RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
   RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
   RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 7795a8c09..49f360ed1 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -888,10 +888,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
         // metadata associated with the footer range.
         ScanRange* footer_range;
         if (footer_split != nullptr) {
-          footer_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), footer_size, footer_start,
-              split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(), files[i]->mtime,
+          footer_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+              footer_size, footer_start, split_metadata->partition_id,
+              footer_split->disk_id(), footer_split->expected_local(),
               BufferOpts(footer_split->cache_options()), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
@@ -899,9 +898,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           int cache_options = !scan_node->IsDataCacheDisabled() ?
               BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING;
           footer_range =
-              scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
-                   footer_size, footer_start, split_metadata->partition_id, -1,
-                   expected_local, files[i]->mtime, BufferOpts(cache_options), split);
+              scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+                  footer_size, footer_start, split_metadata->partition_id, -1,
+                  expected_local, BufferOpts(cache_options), split);
         }
         footer_ranges.push_back(footer_range);
       } else {
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index ce04bf178..12185af5b 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -155,8 +155,9 @@ Status HdfsOrcScanner::ScanRangeInputStream::readRandom(
   bool expected_local = split_range->ExpectedLocalRead(offset, length);
   int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
-      metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
-      split_range->disk_id(), expected_local, split_range->mtime(),
+      ScanRange::FileInfo{scanner_->filename(), metadata_range->fs(),
+          split_range->mtime(), split_range->is_erasure_coded()},
+      length, offset, partition_id, split_range->disk_id(), expected_local,
       BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
   unique_ptr<BufferDescriptor> io_buffer;
   Status status;
@@ -255,9 +256,11 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe)
       string msg = Substitute("Invalid read len.");
       return Status(msg);
     }
-    ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
-        filename(), range.length_, range.offset_, partition_id, split_range->disk_id(),
-        col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options()));
+    ScanRange* scan_range = scan_node_->AllocateScanRange(
+        ScanRange::FileInfo{filename(), metadata_range_->fs(), split_range->mtime(),
+            split_range->is_erasure_coded()},
+        range.length_, range.offset_, partition_id, split_range->disk_id(),
+        col_range_local, BufferOpts(split_range->cache_options()));
     RETURN_IF_ERROR(
         context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_));
   }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 64e4f546e..df70ceb2e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1913,10 +1913,8 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
   const int cache_options =
       metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* object_range = scan_node_->AllocateScanRange(
-      metadata_range_->fs(), filename(), size,
-      offset, partition_id,
+      metadata_range_->GetFileInfo(), size, offset, partition_id,
       metadata_range_->disk_id(), metadata_range_->expected_local(),
-      metadata_range_->mtime(),
       BufferOpts::ReadInto(buffer, size, cache_options));
   unique_ptr<BufferDescriptor> io_buffer;
   bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index e46edfa65..9925714ac 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -96,12 +96,10 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
   int cache_options =
       scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* object_range = scanner_->scan_node_->AllocateScanRange(
-      scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
-      scan_range_start, move(sub_ranges), partition_id,
-      scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
-      scanner_->metadata_range_->mtime(),
-      BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
-          cache_options));
+      scanner_->metadata_range_->GetFileInfo(), scan_range_size, scan_range_start,
+      move(sub_ranges), partition_id, scanner_->metadata_range_->disk_id(),
+      scanner_->metadata_range_->expected_local(), BufferOpts::ReadInto(
+          page_index_buffer_.buffer(), page_index_buffer_.Size(), cache_options));
 
   unique_ptr<BufferDescriptor> io_buffer;
   bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index 972b326aa..3caa3ebda 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -87,10 +87,11 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
       static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
   // Determine if the column is completely contained within a local split.
   bool col_range_local = split_range->ExpectedLocalRead(col_start, col_len);
-  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
-      filename(), col_len, col_start, move(sub_ranges),
-      partition_id, split_range->disk_id(),
-      col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options()));
+  ScanRange::FileInfo fi = metadata_range->GetFileInfo();
+  fi.mtime = file_desc.mtime;
+  scan_range_ = parent_->scan_node_->AllocateScanRange(fi,
+      col_len, col_start, move(sub_ranges), partition_id, split_range->disk_id(),
+      col_range_local, BufferOpts(split_range->cache_options()));
   page_headers_read_ = 0;
   dictionary_header_encountered_ = false;
   state_ = State::Initialized;
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a9ac2061d..2e453dc96 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -172,9 +172,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     // Disable HDFS caching as we are reading past the end.
     int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
-        scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
-        scan_range_->disk_id(), expected_local, scan_range_->mtime(),
-        BufferOpts(cache_options));
+        scan_range_->GetFileInfo(), read_past_buffer_size, offset, partition_id,
+        scan_range_->disk_id(), expected_local, BufferOpts(cache_options));
     bool needs_buffers;
     RETURN_IF_ERROR(
         parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc
index f36b21c67..0aae4faaf 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -145,10 +145,9 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           DCHECK_GT(files[i]->file_length, 0);
           ScanRangeMetadata* metadata =
               static_cast<ScanRangeMetadata*>(split->meta_data());
-          ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), files[i]->file_length, 0,
-              metadata->partition_id, split->disk_id(), split->expected_local(),
-              files[i]->mtime, BufferOpts(split->cache_options()));
+          ScanRange* file_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+              files[i]->file_length, 0, metadata->partition_id, split->disk_id(),
+              split->expected_local(), BufferOpts(split->cache_options()));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index afe65e39d..173899edc 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -261,8 +261,8 @@ void DiskIoMgrStress::NewClient(int i) {
     range_len = min(range_len, file_len - assigned_len);
 
     ScanRange* range = client.obj_pool.Add(new ScanRange);
-    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
-        0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
+    range->Reset(ScanRange::FileInfo{files_[client.file_idx].filename.c_str()},
+        range_len, assigned_len, 0, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 52741f0f8..1e5f9240b 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -131,8 +131,8 @@ class DiskIoMgrTest : public testing::Test {
     }
     if (status.ok()) {
       ScanRange* scan_range = pool_.Add(new ScanRange());
-      scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
-          (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME,
+      scan_range->Reset(ScanRange::FileInfo{(*written_range)->file()},
+          (*written_range)->len(), (*written_range)->offset(), 0, false,
           BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, client, scan_range,
           reinterpret_cast<const char*>(data), sizeof(int32_t));
@@ -300,8 +300,8 @@ class DiskIoMgrTest : public testing::Test {
     ScanRange* range = pool->Add(new ScanRange);
     int cache_options =
         is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime,
-        BufferOpts(cache_options), move(sub_ranges), meta_data);
+    range->Reset(ScanRange::FileInfo{file_path, nullptr, mtime}, len, offset, disk_id,
+        true, BufferOpts(cache_options), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -1591,7 +1591,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME,
+    range->Reset(ScanRange::FileInfo{tmp_file}, scan_len, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1638,8 +1638,8 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
     int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime,
-        BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
+    range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime}, data_len, 0,
+        0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
         move(sub_ranges));
     if (fake_cache) {
       SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
@@ -1688,7 +1688,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
         LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
     unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME,
+    range->Reset(ScanRange::FileInfo{tmp_file}, SCAN_LEN, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -2059,8 +2059,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
     auto data = datas.at(i);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, mtime,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
     bool needs_buffers;
@@ -2085,8 +2086,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
     auto data = datas.at(i);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, mtime,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
     bool needs_buffers;
@@ -2169,8 +2171,9 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
   ScanRange* scan_range = tmp_pool.Add(new ScanRange);
   size_t buffer_len = sizeof(int32_t);
   vector<uint8_t> client_buffer(buffer_len);
-  scan_range->Reset(hdfsConnect("default", 0), (*new_range)->file(), (*new_range)->len(),
-      (*new_range)->offset(), 0, false, mtime,
+  scan_range->Reset(
+      ScanRange::FileInfo{(*new_range)->file(), hdfsConnect("default", 0), mtime},
+      (*new_range)->len(), (*new_range)->offset(), 0, false,
       BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
       nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
   bool needs_buffers;
@@ -2588,8 +2591,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
     auto range = ranges.at(0);
     size_t buffer_len = sizeof(int32_t);
     vector<uint8_t> client_buffer(buffer_len);
-    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
-        range->offset(), 0, false, 1000000,
+    scan_range->Reset(
+        ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), 1000000},
+        range->len(), range->offset(), 0, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
         nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile());
     bool needs_buffers;
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 0b98441a2..9dfef6aae 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -412,6 +412,9 @@ void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats) {
       scan_range_->reader_->bytes_read_short_circuit_.Add(
           stats->totalShortCircuitBytesRead);
       scan_range_->reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
+      if (scan_range_->is_erasure_coded()) {
+        scan_range_->reader_->bytes_read_ec_.Add(stats->totalBytesRead);
+      }
       if (stats->totalLocalBytesRead != stats->totalBytesRead) {
         num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
       }
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 96d609d93..646908374 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -158,6 +158,7 @@ class RequestContext {
   int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
   int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
   int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
+  int64_t bytes_read_ec() const { return bytes_read_ec_.Load(); }
   int num_remote_ranges() const { return num_remote_ranges_.Load(); }
   int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); }
 
@@ -399,6 +400,9 @@ class RequestContext {
   /// Total number of bytes read from date node cache, updated at end of each range scan
   AtomicInt64 bytes_read_dn_cache_{0};
 
+  /// Total number of erasure-coded bytes read, updated at end of each range scan
+  AtomicInt64 bytes_read_ec_{0};
+
   /// Total number of bytes from remote reads that were expected to be local.
   AtomicInt64 unexpected_remote_bytes_{0};
 
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index ff3970e49..5121d6acc 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -146,6 +146,7 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   int64_t offset() const { return offset_; }
   int64_t len() const { return len_; }
   int disk_id() const { return disk_id_; }
+  bool is_erasure_coded() const { return is_erasure_coded_; }
   RequestType::type request_type() const { return request_type_; }
 
  protected:
@@ -171,6 +172,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   /// Id of disk queue containing byte range.
   int disk_id_;
 
+  /// Whether file is erasure coded.
+  bool is_erasure_coded_;
+
   /// The type of IO request, READ or WRITE.
   RequestType::type request_type_;
 };
@@ -261,11 +265,26 @@ class ScanRange : public RequestRange {
     int64_t length;
   };
 
+  /// Struct for passing file info for constructing ScanRanges. Only contains details
+  /// consistent across all ranges for a given file. Filename is only used for the
+  /// duration of calls accepting FileInfo.
+  struct FileInfo {
+    const char *filename;
+    hdfsFS fs = nullptr;
+    int64_t mtime = ScanRange::INVALID_MTIME;
+    bool is_erasure_coded = false;
+  };
+
   /// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it
   /// with the rest of the input variables.
-  static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+  static ScanRange* AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
       int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-      int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts);
+      int disk_id, bool expected_local, const BufferOpts& buffer_opts);
+
+  /// Get file info for the current scan range.
+  FileInfo GetFileInfo() const {
+    return FileInfo{file_.c_str(), fs_, mtime_, is_erasure_coded_};
+  }
 
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
@@ -285,15 +304,14 @@ class ScanRange : public RequestRange {
   /// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently
   /// it is not generally safe to do so, but some unit tests reuse ranges after
   /// successfully reading to eos.
-  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-      void* meta_data = nullptr, DiskFile* disk_file = nullptr,
-      DiskFile* disk_buffer_file = nullptr);
+  void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr,
+      DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
 
   /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
   /// in advance, as this method will do the merge.
-  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+  void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+      bool expected_local, const BufferOpts& buffer_opts,
       std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr,
       DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
 
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1851f3a1b..57d67bbfa 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -466,57 +466,57 @@ ScanRange::~ScanRange() {
   DCHECK(!read_in_flight_);
 }
 
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-    void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
-  Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {}, meta_data,
-      disk_file, disk_buffer_file);
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+    bool expected_local, const BufferOpts& buffer_opts, void* meta_data,
+    DiskFile* disk_file, DiskFile* disk_buffer_file) {
+  Reset(fi, len, offset, disk_id, expected_local, buffer_opts, {},
+      meta_data, disk_file, disk_buffer_file);
 }
 
-ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
     int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) {
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts) {
   DCHECK_GE(disk_id, -1);
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
   disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
-      file, disk_id, expected_local, /* check_default_fs */ true);
+      fi.filename, disk_id, expected_local, /* check_default_fs */ true);
   ScanRange* range = obj_pool->Add(new ScanRange);
-  range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
+  range->Reset(fi, len, offset, disk_id, expected_local, buffer_opts,
       move(sub_ranges), metadata);
   return range;
 }
 
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
-    vector<SubRange>&& sub_ranges, void* meta_data, DiskFile* disk_file,
-    DiskFile* disk_buffer_file) {
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+    bool expected_local, const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges,
+    void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
   DCHECK(buffer_manager_->is_readybuffer_empty());
   DCHECK(!read_in_flight_);
-  DCHECK(file != nullptr);
+  DCHECK(fi.filename != nullptr);
   DCHECK_GE(len, 0);
   DCHECK_GE(offset, 0);
   DCHECK(buffer_opts.client_buffer_ == nullptr ||
          buffer_opts.client_buffer_len_ >= len_);
-  fs_ = fs;
-  if (fs != nullptr) {
+  fs_ = fi.fs;
+  if (fs_ != nullptr) {
     file_reader_ = make_unique<HdfsFileReader>(this, fs_, false);
     local_buffer_reader_ = make_unique<LocalFileReader>(this);
   } else {
     file_reader_ = make_unique<LocalFileReader>(this);
   }
-  file_ = file;
+  file_ = fi.filename;
   len_ = len;
   bytes_to_read_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
+  is_erasure_coded_ = fi.is_erasure_coded;
   cache_options_ = buffer_opts.cache_options_;
   disk_file_ = disk_file;
   disk_buffer_file_ = disk_buffer_file;
 
   // HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
-  if (fs_) DCHECK_GT(mtime, 0);
-  mtime_ = mtime;
+  mtime_ = fi.mtime;
+  if (fs_) DCHECK_GT(mtime_, 0);
   meta_data_ = meta_data;
   if (buffer_opts.client_buffer_ != nullptr) {
     buffer_manager_->set_client_buffer();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index be84ca420..8a90cf614 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -1629,16 +1629,18 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
     DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset);
     DiskFile* remote_file = tmp_file->DiskFile();
     // Reset the read_range, use the remote filesystem's disk id.
-    handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(),
-        handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_,
+    handle->read_range_->Reset(
+        ScanRange::FileInfo{
+            remote_file->path().c_str(), tmp_file->hdfs_conn_, tmp_file->mtime_},
+        handle->write_range_->len(), offset, tmp_file->disk_id(), false,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING),
         nullptr, remote_file, local_read_buffer_file);
   } else {
     // Read from local.
-    handle->read_range_->Reset(nullptr, handle->write_range_->file(),
+    handle->read_range_->Reset(
+        ScanRange::FileInfo{handle->write_range_->file()},
         handle->write_range_->len(), offset, handle->write_range_->disk_id(), false,
-        ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5afaf73c6..67ea5414c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -134,6 +134,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
       hdfs_scan_range.__set_offset(scan_range_offset);
       hdfs_scan_range.__set_partition_id(spec.partition_id);
       hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
+      hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
       if (fb_desc->absolute_path() != nullptr) {
         hdfs_scan_range.__set_absolute_path(fb_desc->absolute_path()->str());
       }
@@ -1123,6 +1124,7 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra
     hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
     hdfs_file_split->set_partition_path_hash(
         tscan_range.hdfs_file_split.partition_path_hash);
+    hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
     if (tscan_range.hdfs_file_split.__isset.absolute_path) {
       hdfs_file_split->set_absolute_path(
           tscan_range.hdfs_file_split.absolute_path);
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index e57062fcc..6a2e07402 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -59,6 +59,8 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ =
     "impala-server.io-mgr.short-circuit-bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ =
     "impala-server.io-mgr.cached-bytes-read";
+const char* ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ =
+    "impala-server.io-mgr.erasure-coded-bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES =
     "impala-server.io-mgr.remote-data-cache-hit-bytes";
 const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT =
@@ -167,6 +169,7 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = nullptr;
 IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = nullptr;
@@ -341,6 +344,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
   IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
+  IO_MGR_ERASURE_CODED_BYTES_READ = IO_MGR_METRICS->AddCounter(
+      ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ, 0);
   IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
   IO_MGR_BYTES_WRITTEN = IO_MGR_METRICS->AddCounter(
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index a3811f7b3..c43dca69f 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -76,6 +76,9 @@ class ImpaladMetricKeys {
   /// Total number of cached bytes read by the io mgr
   static const char* IO_MGR_CACHED_BYTES_READ;
 
+  /// Total number of erasure-coded bytes read by the io mgr
+  static const char* IO_MGR_ERASURE_CODED_BYTES_READ;
+
   /// Total number of bytes read from the remote data cache.
   static const char* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
 
@@ -258,6 +261,7 @@ class ImpaladMetrics {
   static IntCounter* IO_MGR_BYTES_READ;
   static IntCounter* IO_MGR_LOCAL_BYTES_READ;
   static IntCounter* IO_MGR_CACHED_BYTES_READ;
+  static IntCounter* IO_MGR_ERASURE_CODED_BYTES_READ;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT;
   static IntCounter* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 46e8a5a30..d76a25fa6 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -227,6 +227,9 @@ struct THdfsFileSplit {
   // The absolute path of the file, it's used only when data files are outside of
   // the Iceberg table location (IMPALA-11507).
   10: optional string absolute_path
+
+  // Whether the HDFS file is stored with erasure coding.
+  11: optional bool is_erasure_coded
 }
 
 // Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index a5abd6651..6e5bca495 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -409,6 +409,16 @@
     "kind": "COUNTER",
     "key": "impala-server.io-mgr.cached-bytes-read"
   },
+  {
+    "description": "Total number of erasure-coded bytes read by the IO manager.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Erasure Coded Bytes Read",
+    "units": "BYTES",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.erasure-coded-bytes-read"
+  },
   {
     "description": "Total number of local bytes read by the IO manager.",
     "contexts": [
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 1958595ad..35cdddc4c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1424,6 +1424,7 @@ public class HdfsScanNode extends ScanNode {
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
             partition.getLocation().hashCode());
         hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath());
+        hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
         scanRange.setHdfs_file_split(hdfsFileSplit);
         if (fileDesc.getFbFileMetadata() != null) {
           scanRange.setFile_metadata(fileDesc.getFbFileMetadata().getByteBuffer());
diff --git a/tests/query_test/test_io_metrics.py b/tests/query_test/test_io_metrics.py
new file mode 100644
index 000000000..f4e4d1ef4
--- /dev/null
+++ b/tests/query_test/test_io_metrics.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.util.filesystem_utils import IS_EC, IS_HDFS
+
+
+class TestIOMetrics(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIOMetrics, cls).add_test_dimensions()
+    # Run with num_nodes=1 to make it easy to verify metric changes.
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension(num_nodes=1))
+
+  # Issue a local query and test that read metrics are updated.
+  @pytest.mark.execute_serially
+  def test_local_read(self, vector):
+    # Accumulate metrics that are expected to update from a read, and metrics that are
+    # expected not to change with this configuration. Metrics that shouldn't change for
+    # this test should be 0 throughout the whole test suite so we can just verify they're
+    # 0 after running our query. Omits cached-bytes-read because it has its own test.
+    expect_nonzero_metrics = ["impala-server.io-mgr.bytes-read"]
+    expect_zero_metrics = []
+
+    def append_metric(metric, expect_nonzero):
+      (expect_nonzero_metrics if expect_nonzero else expect_zero_metrics).append(metric)
+
+    append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC)
+    append_metric("impala-server.io-mgr.short-circuit-bytes-read",
+        IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+    # TODO: this should be updated for Ozone, but the code that updates it is guarded by
+    #       IsHdfsPath and adding Ozone causes a crash. Plan to debug in IMPALA-11697.
+    append_metric("impala-server.io-mgr.local-bytes-read",
+        IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+
+    nonzero_before = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+    result = self.execute_query("select count(*) from tpch.nation")
+    assert(len(result.data) == 1)
+    assert(result.data[0] == '25')
+    nation_data_file_length = 2199
+
+    nonzero_after = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+    zero_values = self.impalad_test_service.get_metric_values(expect_zero_metrics)
+    assert(len(expect_zero_metrics) == len(zero_values))
+    LOG.info("Verifying %s expect-zero metrics.", len(expect_zero_metrics))
+    for metric, value in zip(expect_zero_metrics, zero_values):
+      LOG.info("%s: %s", metric, value)
+      assert(value == 0)
+
+    assert(len(expect_nonzero_metrics) == len(nonzero_before) == len(nonzero_after))
+    LOG.info("Verifying %s expect-non-zero metrics.", len(expect_nonzero_metrics))
+    for metric, before, after in \
+        zip(expect_nonzero_metrics, nonzero_before, nonzero_after):
+      LOG.info("%s: %s -> %s", metric, before, after)
+      assert(before + nation_data_file_length == after)