You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/02/22 17:24:50 UTC

[impala] 02/02: IMPALA-8178: Disable file handle cache for HDFS erasure coded files

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dce82e4e018d1944ff19bb6f87139b51c1b0287e
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Feb 21 10:03:57 2019 -0800

    IMPALA-8178: Disable file handle cache for HDFS erasure coded files
    
    Testing on an erasure coded minicluster has revealed that each
    file handle for an erasure coded files uses about 3MB of native
    memory. This shows up as "java.nio:type=BufferPool,name=direct"
    in the /jmx endpoint (here showing the output when 608 handles
    are open):
    
    {
      "name": "java.nio:type=BufferPool,name=direct",
      "modelerType": "sun.management.ManagementFactoryHelper$1",
      "Name": "direct",
      "TotalCapacity": 1921048960,
      "MemoryUsed": 1921048961,
      "Count": 633,
      "ObjectName": "java.nio:type=BufferPool,name=direct"
    }
    
    The memory is not released or reduced by a call to unbuffer(),
    so these file handles are not suitable for long term caching.
    HDFS-14308 tracks the implementation of unbuffer() for
    DFSStripedInputStream. This issue showed up when remote
    file handle caching was enabled in IMPALA-7265, as erasure
    coded files are always scheduled to be remote (IMPALA-7019).
    
    This disables file handle caching for erasure coded files,
    which requires plumbing through the information about which
    ScanRanges are accessing erasure coded files.
    
    With this change, core tests pass on an erasure coded system.
    
    Change-Id: I8c761e08aacc952de0033a4c91e07f15c8ec96da
    Reviewed-on: http://gerrit.cloudera.org:8080/12552
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc                |  2 +-
 be/src/exec/hdfs-orc-scanner.cc                     |  2 +-
 be/src/exec/hdfs-scan-node-base.cc                  | 17 ++++++++++-------
 be/src/exec/hdfs-scan-node-base.h                   | 12 +++++++++---
 be/src/exec/hdfs-scanner.cc                         |  4 ++--
 be/src/exec/hdfs-text-scanner.cc                    |  1 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc         |  1 +
 be/src/exec/parquet/parquet-column-readers.cc       |  2 +-
 be/src/exec/scanner-context.cc                      |  3 ++-
 be/src/runtime/io/disk-io-mgr-stress.cc             |  2 +-
 be/src/runtime/io/disk-io-mgr-test.cc               | 10 +++++-----
 be/src/runtime/io/request-ranges.h                  |  9 +++++++--
 be/src/runtime/io/scan-range.cc                     | 21 ++++++++++++++-------
 be/src/runtime/tmp-file-mgr.cc                      |  2 +-
 be/src/scheduling/scheduler.cc                      |  1 +
 common/thrift/PlanNodes.thrift                      |  3 +++
 .../org/apache/impala/planner/HdfsScanNode.java     |  3 ++-
 17 files changed, 62 insertions(+), 33 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 4e2dd36..6ac981b 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -64,7 +64,7 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     // 1 queue for each NIC as well?
     ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
         files[i]->filename.c_str(), header_size, 0, header_metadata, -1, false,
-        BufferOpts::Uncached());
+        files[i]->is_erasure_coded, BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }
   // When the header is parsed, we will issue more AddDiskIoRanges in
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 071cedc..a0083bf 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -109,7 +109,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
   bool expected_local = false;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
       metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
-      split_range->disk_id(), expected_local,
+      split_range->disk_id(), expected_local, split_range->is_erasure_coded(),
       BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length));
 
   unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 94782b3..966bb58 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -220,6 +220,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       file_desc->file_length = split.file_length;
       file_desc->mtime = split.mtime;
       file_desc->file_compression = split.file_compression;
+      file_desc->is_erasure_coded = split.is_erasure_coded;
       RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
           native_file_path, &file_desc->fs, &fs_cache));
       per_type_files_[partition_desc->file_format()].push_back(file_desc);
@@ -235,7 +236,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     file_desc->splits.push_back(
         AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
             split.offset, split.partition_id, params.volume_id, expected_local,
-            BufferOpts(try_cache, file_desc->mtime)));
+            file_desc->is_erasure_coded, BufferOpts(try_cache, file_desc->mtime)));
   }
 
   // Update server wide metrics for number of scan ranges and ranges that have
@@ -563,17 +564,17 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-    const BufferOpts& buffer_opts,
+    bool is_erasure_coded, const BufferOpts& buffer_opts,
     const ScanRange* original_split) {
   ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
         new ScanRangeMetadata(partition_id, original_split));
   return AllocateScanRange(fs, file, len, offset, metadata, disk_id, expected_local,
-      buffer_opts);
+      is_erasure_coded, buffer_opts);
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-    const BufferOpts& buffer_opts) {
+    bool is_erasure_coded, const BufferOpts& buffer_opts) {
   DCHECK_GE(disk_id, -1);
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
@@ -587,15 +588,17 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
       file, disk_id, expected_local);
 
   ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
-  range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);
+  range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded,
+      buffer_opts, metadata);
   return range;
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
-    bool expected_local, int mtime, const ScanRange* original_split) {
+    bool expected_local, int mtime, bool is_erasure_coded,
+    const ScanRange* original_split) {
   return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
-      BufferOpts(try_cache, mtime), original_split);
+      is_erasure_coded, BufferOpts(try_cache, mtime), original_split);
 }
 
 Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges,
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 5de1768..a38b3ce 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -54,7 +54,7 @@ class TScanRange;
 struct HdfsFileDesc {
   HdfsFileDesc(const std::string& filename)
     : fs(NULL), filename(filename), file_length(0), mtime(0),
-      file_compression(THdfsCompression::NONE) {
+      file_compression(THdfsCompression::NONE), is_erasure_coded(false) {
   }
 
   /// Connection to the filesystem containing the file.
@@ -72,6 +72,9 @@ struct HdfsFileDesc {
 
   THdfsCompression::type file_compression;
 
+  /// is erasure coded
+  bool is_erasure_coded;
+
   /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
   std::vector<io::ScanRange*> splits;
 };
@@ -249,6 +252,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// This is thread safe.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
+      bool is_erasure_coded,
       const io::BufferOpts& buffer_opts,
       const io::ScanRange* original_split = NULL);
 
@@ -256,12 +260,14 @@ class HdfsScanNodeBase : public ScanNode {
   /// the partition_id, original_splits, and other information about the scan range.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-      const io::BufferOpts& buffer_opts);
+      bool is_erasure_coded, const io::BufferOpts& buffer_opts);
 
   /// Old API for compatibility with text scanners (e.g. LZO text scanner).
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
-      bool expected_local, int mtime, const io::ScanRange* original_split = NULL);
+      bool expected_local, int mtime,
+      bool is_erasure_coded = false,
+      const io::ScanRange* original_split = NULL);
 
   /// Adds ranges to the io mgr queue. Can be overridden to add scan-node specific
   /// actions like starting scanner threads. Must not be called once
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 639bac3..cdb1de5 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -812,14 +812,14 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(),
+              footer_split->expected_local(), files[i]->is_erasure_coded,
               BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
           footer_range =
               scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
                    footer_size, footer_start, split_metadata->partition_id, -1, false,
-                   BufferOpts::Uncached(), split);
+                   files[i]->is_erasure_coded, BufferOpts::Uncached(), split);
         }
 
         footer_ranges.push_back(footer_range);
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index af93974..7bd41d7 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -116,6 +116,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), files[i]->file_length, 0,
               metadata->partition_id, split->disk_id(), split->expected_local(),
+              files[i]->is_erasure_coded,
               BufferOpts(split->try_cache(), files[i]->mtime));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 3836d0b..358355f 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1261,6 +1261,7 @@ Status HdfsParquetScanner::ProcessFooter() {
     ScanRange* metadata_range = scan_node_->AllocateScanRange(
         metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
         metadata_range_->disk_id(), metadata_range_->expected_local(),
+        metadata_range_->is_erasure_coded(),
         BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
     unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index f6ad734..61c5466 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1018,7 +1018,7 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
       && col_end <= split_range->offset() + split_range->len();
   scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
       filename(), col_len, col_start, partition_id, split_range->disk_id(),
-      col_range_local,
+      col_range_local, split_range->is_erasure_coded(),
       BufferOpts(split_range->try_cache(), file_desc.mtime));
   ClearDictionaryDecoder();
   return Status::OK();
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 5ea485d..2a0c8a1 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -157,7 +157,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     int64_t partition_id = parent_->partition_descriptor()->id();
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
-        scan_range_->disk_id(), false, BufferOpts::Uncached());
+        scan_range_->disk_id(), false, scan_range_->is_erasure_coded(),
+        BufferOpts::Uncached());
     bool needs_buffers;
     RETURN_IF_ERROR(
         parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index ab0757c..e267170 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -262,7 +262,7 @@ void DiskIoMgrStress::NewClient(int i) {
 
     ScanRange* range = client.obj_pool.Add(new ScanRange);
     range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
-        0, false, BufferOpts::Uncached());
+        0, false, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index a8e07b8..2eae1a6 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -112,7 +112,7 @@ class DiskIoMgrTest : public testing::Test {
     if (status.ok()) {
       ScanRange* scan_range = pool_.Add(new ScanRange());
       scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
-          (*written_range)->offset(), 0, false, BufferOpts::Uncached());
+          (*written_range)->offset(), 0, false, false, BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, client, scan_range,
           reinterpret_cast<const char*>(data), sizeof(int32_t));
     }
@@ -241,7 +241,7 @@ class DiskIoMgrTest : public testing::Test {
       int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
       std::vector<ScanRange::SubRange> sub_ranges = {}) {
     ScanRange* range = pool->Add(new ScanRange);
-    range->Reset(nullptr, file_path, len, offset, disk_id, true,
+    range->Reset(nullptr, file_path, len, offset, disk_id, true, false,
         BufferOpts(is_cached, mtime), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
@@ -1445,7 +1445,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
+    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1491,7 +1491,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     int result_len = strlen(expected_result);
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, data_len, 0, 0, true,
+    range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false,
         BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(),
             result_len), move(sub_ranges));
     if (fake_cache) {
@@ -1541,7 +1541,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
         LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
     unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
+    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 0dddac5..a459800 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -250,18 +250,20 @@ class ScanRange : public RequestRange {
   /// it is not generally safe to do so, but some unit tests reuse ranges after
   /// successfully reading to eos.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
+      bool expected_local, bool is_erasure_coded, const BufferOpts& buffer_opts,
+      void* meta_data = nullptr);
 
   /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
   /// in advance, as this method will do the merge.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, const BufferOpts& buffer_opts,
+      bool expected_local, bool is_erasure_coded, const BufferOpts& buffer_opts,
       std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
 
   void* meta_data() const { return meta_data_; }
   bool try_cache() const { return try_cache_; }
   bool read_in_flight() const { return read_in_flight_; }
   bool expected_local() const { return expected_local_; }
+  bool is_erasure_coded() const { return is_erasure_coded_; }
   int64_t bytes_to_read() const { return bytes_to_read_; }
 
   /// Returns the next buffer for this scan range. buffer is an output parameter.
@@ -446,6 +448,9 @@ class ScanRange : public RequestRange {
   /// TODO: we can do more with this
   bool expected_local_ = false;
 
+  /// If true, the file associated with the scan range is erasure coded. Set in Reset().
+  bool is_erasure_coded_ = false;
+
   /// Last modified time of the file associated with the scan range. Set in Reset().
   int64_t mtime_;
 
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index b98763d..6e3ca8c 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -199,11 +199,13 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   // lock across the read call.
   // To use the file handle cache:
   // 1. It must be enabled at the daemon level.
-  // 2. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
+  // 2. The file cannot be erasure coded.
+  // 3. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
   //    'cache_remote_file_handles' is true
-  // Note: S3, ADLS, and ABFS file handles are not cached.
+  // Note: S3, ADLS, and ABFS file handles are not cached. Erasure coded HDFS files
+  // are also not cached (IMPALA-8178), due to excessive memory usage (see HDFS-14308).
   bool use_file_handle_cache = false;
-  if (is_file_handle_caching_enabled() &&
+  if (is_file_handle_caching_enabled() && !is_erasure_coded_ &&
       (expected_local_ ||
        (FLAGS_cache_remote_file_handles && disk_id_ == io_mgr_->RemoteDfsDiskId()))) {
     use_file_handle_cache = true;
@@ -444,13 +446,15 @@ ScanRange::~ScanRange() {
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
-  Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, {}, meta_data);
+    int disk_id, bool expected_local, bool is_erasure_coded,
+    const BufferOpts& buffer_opts, void* meta_data) {
+  Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, buffer_opts,
+      {}, meta_data);
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, const BufferOpts& buffer_opts,
-    vector<SubRange>&& sub_ranges, void* meta_data) {
+    int disk_id, bool expected_local, bool is_erasure_coded,
+    const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
   DCHECK(ready_buffers_.empty());
   DCHECK(!read_in_flight_);
   DCHECK(file != nullptr);
@@ -479,7 +483,10 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   } else {
     external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
   }
+  // Erasure coded should not be considered local (see IMPALA-7019).
+  DCHECK(!(expected_local && is_erasure_coded));
   expected_local_ = expected_local;
+  is_erasure_coded_ = is_erasure_coded;
   io_mgr_ = nullptr;
   reader_ = nullptr;
   sub_ranges_.clear();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 6eee15e..5c76870 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -367,7 +367,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
   handle->read_range_ = scan_range_pool_.Add(new ScanRange);
   handle->read_range_->Reset(nullptr, handle->write_range_->file(),
       handle->write_range_->len(), handle->write_range_->offset(),
-      handle->write_range_->disk_id(), false,
+      handle->write_range_->disk_id(), false, false,
       BufferOpts::ReadInto(buffer.data(), buffer.len()));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 3d6b49f..b4ed475 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -261,6 +261,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
       hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
       hdfs_scan_range.__set_offset(scan_range_offset);
       hdfs_scan_range.__set_partition_id(spec.partition_id);
+      hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
       TScanRange scan_range;
       scan_range.__set_hdfs_file_split(hdfs_scan_range);
       TScanRangeLocationList scan_range_list;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 1f34a22..9ae9d88 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -188,6 +188,9 @@ struct THdfsFileSplit {
 
   // last modified time of the file
   7: required i64 mtime
+
+  // whether this file is erasure-coded
+  8: required bool is_erasure_coded
 }
 
 // key range for single THBaseScanNode
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cbbfc43..b1eefed 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -956,7 +956,8 @@ public class HdfsScanNode extends ScanNode {
         TScanRange scanRange = new TScanRange();
         scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
             currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
-            fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime()));
+            fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
+            fileDesc.getIsEc()));
         TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
         scanRangeLocations.scan_range = scanRange;
         scanRangeLocations.locations = locations;