You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/10/09 16:18:24 UTC

[impala] 01/02: IMPALA-9485: Enable file handle cache for EC files

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3382759664fe99317f27200b3e52a1e967f0a042
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Oct 1 12:07:21 2020 -0700

    IMPALA-9485: Enable file handle cache for EC files
    
    This is essentially a revert of IMPALA-8178. HDFS-14308 added
    CanUnbuffer support to the EC input stream APIs in the HDFS client lib.
    This patch enables file handle caching for EC files.
    
    Testing:
    * Ran core tests against an EC build (ERASURE_CODING=true)
    
    Change-Id: Ieb455eeed02a229a4559d3972dfdac7df32cdb99
    Reviewed-on: http://gerrit.cloudera.org:8080/16567
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc               |  4 +--
 be/src/exec/hdfs-orc-scanner.cc                    |  3 +--
 be/src/exec/hdfs-scan-node-base.cc                 | 21 +++++++---------
 be/src/exec/hdfs-scan-node-base.h                  | 13 +++-------
 be/src/exec/hdfs-scanner.cc                        |  5 ++--
 be/src/exec/hdfs-text-scanner.cc                   |  3 +--
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  2 +-
 be/src/exec/parquet/parquet-page-index.cc          |  2 +-
 be/src/exec/parquet/parquet-page-reader.cc         |  3 +--
 be/src/exec/scanner-context.cc                     |  4 +--
 be/src/runtime/io/disk-io-mgr-stress.cc            |  2 +-
 be/src/runtime/io/disk-io-mgr-test.cc              | 10 ++++----
 be/src/runtime/io/request-ranges.h                 | 19 +++++---------
 be/src/runtime/io/scan-range.cc                    | 29 ++++++++--------------
 be/src/runtime/tmp-file-mgr.cc                     |  2 +-
 be/src/scheduling/scheduler.cc                     |  2 --
 common/thrift/PlanNodes.thrift                     |  3 ---
 .../org/apache/impala/planner/HdfsScanNode.java    |  2 +-
 tests/custom_cluster/test_hdfs_fd_caching.py       |  3 +--
 19 files changed, 50 insertions(+), 82 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 064ef4c..ae66d0e 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -64,8 +64,8 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         BufferOpts::NO_CACHING;
     ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
         files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
-        expected_local, files[i]->is_erasure_coded, files[i]->mtime,
-        BufferOpts(cache_options), metadata->original_split);
+        expected_local, files[i]->mtime, BufferOpts(cache_options),
+        metadata->original_split);
     ScanRangeMetadata* header_metadata =
             static_cast<ScanRangeMetadata*>(header_range->meta_data());
     header_metadata->is_sequence_header = true;
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index fe86013..0a706a4 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -116,8 +116,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
   int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
       metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
-      split_range->disk_id(), expected_local, split_range->is_erasure_coded(),
-      split_range->mtime(),
+      split_range->disk_id(), expected_local, split_range->mtime(),
       BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
 
   unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index c0be811..2dd4ef6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -281,7 +281,6 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
         file_desc->mtime = split.mtime();
         file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
         file_desc->file_format = partition_desc->file_format();
-        file_desc->is_erasure_coded = split.is_erasure_coded();
         RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
             native_file_path, &file_desc->fs, &fs_cache));
         shared_state_.per_type_files_[partition_desc->file_format()].push_back(file_desc);
@@ -305,8 +304,8 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
           obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
       file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
           file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
-          params.volume_id(), expected_local, file_desc->is_erasure_coded,
-          file_desc->mtime, BufferOpts(cache_options)));
+          params.volume_id(), expected_local, file_desc->mtime,
+          BufferOpts(cache_options)));
       total_splits++;
     }
     // Update server wide metrics for number of scan ranges and ranges that have
@@ -791,27 +790,26 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-    bool is_erasure_coded, int64_t mtime,  const BufferOpts& buffer_opts,
-    const ScanRange* original_split) {
+    int64_t mtime,  const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
   return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
-      is_erasure_coded, mtime, buffer_opts);
+      mtime, buffer_opts);
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
-    int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
-    int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) {
+    int64_t partition_id, int disk_id, bool expected_local, int64_t mtime,
+    const BufferOpts& buffer_opts, const ScanRange* original_split) {
   ScanRangeMetadata* metadata =
       shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
   return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
-      disk_id, expected_local, is_erasure_coded, mtime, buffer_opts);
+      disk_id, expected_local, mtime, buffer_opts);
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
     int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
-    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
+    int disk_id, bool expected_local, int64_t mtime,
     const BufferOpts& buffer_opts) {
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
@@ -820,8 +818,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int6
   DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
   return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
-      move(sub_ranges), metadata, disk_id, expected_local, is_erasure_coded, mtime,
-      buffer_opts);
+      move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts);
 }
 
 const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 255f3f1..3591a94 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -62,8 +62,7 @@ struct HdfsFileDesc {
       file_length(0),
       mtime(0),
       file_compression(THdfsCompression::NONE),
-      file_format(THdfsFileFormat::TEXT),
-      is_erasure_coded(false) {}
+      file_format(THdfsFileFormat::TEXT) {}
 
   /// Connection to the filesystem containing the file.
   hdfsFS fs;
@@ -81,9 +80,6 @@ struct HdfsFileDesc {
   THdfsCompression::type file_compression;
   THdfsFileFormat::type file_format;
 
-  /// is erasure coded
-  bool is_erasure_coded;
-
   /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
   std::vector<io::ScanRange*> splits;
 
@@ -482,14 +478,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// This is thread safe.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-      bool is_erasure_coded, int64_t mtime,
-      const io::BufferOpts& buffer_opts,
+      int64_t mtime, const io::BufferOpts& buffer_opts,
       const io::ScanRange* original_split = nullptr);
 
   /// Same as the first overload, but it takes sub-ranges as well.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
-      int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
+      int64_t partition_id, int disk_id, bool expected_local,
       int64_t mtime, const io::BufferOpts& buffer_opts,
       const io::ScanRange* original_split = nullptr);
 
@@ -497,7 +492,7 @@ class HdfsScanNodeBase : public ScanNode {
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
       ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-      bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);
+      int64_t mtime, const io::BufferOpts& buffer_opts);
 
   /// Adds ranges to be read later by scanners. Must not be called once
   /// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e471303..028957d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -847,7 +847,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
+              footer_split->expected_local(), files[i]->mtime,
               BufferOpts(footer_split->cache_options()), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
@@ -857,8 +857,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
           footer_range =
               scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
                    footer_size, footer_start, split_metadata->partition_id, -1,
-                   expected_local, files[i]->is_erasure_coded, files[i]->mtime,
-                   BufferOpts(cache_options), split);
+                   expected_local, files[i]->mtime, BufferOpts(cache_options), split);
         }
         footer_ranges.push_back(footer_range);
       } else {
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f98b6a5..9338662 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -148,8 +148,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), files[i]->file_length, 0,
               metadata->partition_id, split->disk_id(), split->expected_local(),
-              files[i]->is_erasure_coded, files[i]->mtime,
-              BufferOpts(split->cache_options()));
+              files[i]->mtime, BufferOpts(split->cache_options()));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 71bbb4d..297dc17 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1385,7 +1385,7 @@ Status HdfsParquetScanner::ProcessFooter() {
     ScanRange* metadata_range = scan_node_->AllocateScanRange(
         metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
         metadata_range_->disk_id(), metadata_range_->expected_local(),
-        metadata_range_->is_erasure_coded(), metadata_range_->mtime(),
+        metadata_range_->mtime(),
         BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size, cache_options));
 
     unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index 6c9b2bf..8e63b4f 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -99,7 +99,7 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
       scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
       scan_range_start, move(sub_ranges), partition_id,
       scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
-      scanner_->metadata_range_->is_erasure_coded(), scanner_->metadata_range_->mtime(),
+      scanner_->metadata_range_->mtime(),
       BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
           cache_options));
 
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index 7765feb..d4a4d4e 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -92,8 +92,7 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
   scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
       filename(), col_len, col_start, move(sub_ranges),
       partition_id, split_range->disk_id(),
-      col_range_local, split_range->is_erasure_coded(), file_desc.mtime,
-      BufferOpts(split_range->cache_options()));
+      col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options()));
   page_headers_read_ = 0;
   dictionary_header_encountered_ = false;
   state_ = State::Initialized;
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a7fbf7a..aeedefe 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -159,8 +159,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
-        scan_range_->disk_id(), expected_local, scan_range_->is_erasure_coded(),
-        scan_range_->mtime(), BufferOpts(cache_options));
+        scan_range_->disk_id(), expected_local, scan_range_->mtime(),
+        BufferOpts(cache_options));
     bool needs_buffers;
     RETURN_IF_ERROR(
         parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 96e58b6..afe65e3 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -262,7 +262,7 @@ void DiskIoMgrStress::NewClient(int i) {
 
     ScanRange* range = client.obj_pool.Add(new ScanRange);
     range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
-        0, false, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
+        0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index c471256..96291fb 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -111,7 +111,7 @@ class DiskIoMgrTest : public testing::Test {
     if (status.ok()) {
       ScanRange* scan_range = pool_.Add(new ScanRange());
       scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
-          (*written_range)->offset(), 0, false, false, ScanRange::INVALID_MTIME,
+          (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME,
           BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, client, scan_range,
           reinterpret_cast<const char*>(data), sizeof(int32_t));
@@ -243,7 +243,7 @@ class DiskIoMgrTest : public testing::Test {
     ScanRange* range = pool->Add(new ScanRange);
     int cache_options =
         is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, file_path, len, offset, disk_id, true, false, mtime,
+    range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime,
         BufferOpts(cache_options), move(sub_ranges), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
@@ -1449,7 +1449,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false, ScanRange::INVALID_MTIME,
+    range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1496,7 +1496,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
     int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
-    range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false, stat_val.st_mtime,
+    range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime,
         BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
         move(sub_ranges));
     if (fake_cache) {
@@ -1546,7 +1546,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
         LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
     unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
     ScanRange* range = pool_.Add(new ScanRange);
-    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false, ScanRange::INVALID_MTIME,
+    range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 1c2a38a..817151e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -252,8 +252,7 @@ class ScanRange : public RequestRange {
   /// with the rest of the input variables.
   static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
       int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-      int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
-      const BufferOpts& buffer_opts);
+      int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts);
 
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
@@ -261,8 +260,7 @@ class ScanRange : public RequestRange {
   /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
   /// queue to add the range to. If 'expected_local' is true, a warning is generated if
   /// the read did not come from a local disk. 'mtime' is the last modification time for
-  /// 'file'; the mtime must change when the file changes. 'is_erasure_coded' is whether
-  /// 'file' is stored using HDFS erasure coding.
+  /// 'file'; the mtime must change when the file changes.
   /// 'buffer_opts' specifies buffer management options - see the DiskIoMgr class comment
   /// and the BufferOpts comments for details.
   /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
@@ -271,15 +269,14 @@ class ScanRange : public RequestRange {
   /// it is not generally safe to do so, but some unit tests reuse ranges after
   /// successfully reading to eos.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, bool is_erasure_coded, int64_t mtime,
-      const BufferOpts& buffer_opts, void* meta_data = nullptr);
+      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+      void* meta_data = nullptr);
 
   /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
   /// in advance, as this method will do the merge.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-      bool expected_local, bool is_erasure_coded, int64_t mtime,
-      const BufferOpts& buffer_opts, std::vector<SubRange>&& sub_ranges,
-      void* meta_data = nullptr);
+      bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+      std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
 
   void* meta_data() const { return meta_data_; }
   int cache_options() const { return cache_options_; }
@@ -287,7 +284,6 @@ class ScanRange : public RequestRange {
   bool UseDataCache() const { return (cache_options_ & BufferOpts::USE_DATA_CACHE) != 0; }
   bool read_in_flight() const { return read_in_flight_; }
   bool expected_local() const { return expected_local_; }
-  bool is_erasure_coded() const { return is_erasure_coded_; }
   int64_t bytes_to_read() const { return bytes_to_read_; }
 
   /// Returns the next buffer for this scan range. buffer is an output parameter.
@@ -475,9 +471,6 @@ class ScanRange : public RequestRange {
   /// TODO: we can do more with this
   bool expected_local_ = false;
 
-  /// If true, the file associated with the scan range is erasure coded. Set in Reset().
-  bool is_erasure_coded_ = false;
-
   /// Last modified time of the file associated with the scan range. Set in Reset().
   int64_t mtime_;
 
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index b8e2cfa..9eeb788 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -192,13 +192,10 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
   // lock across the read call.
   // To use the file handle cache:
   // 1. It must be enabled at the daemon level.
-  // 2. The file cannot be erasure coded.
-  // 3. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
+  // 2. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
   //    'cache_remote_file_handles' is true
-  // Note: S3, ADLS, and ABFS file handles are not cached. Erasure coded HDFS files
-  // are also not cached (IMPALA-8178), due to excessive memory usage (see HDFS-14308).
   bool use_file_handle_cache = false;
-  if (is_file_handle_caching_enabled() && !is_erasure_coded_ &&
+  if (is_file_handle_caching_enabled() &&
       (expected_local_ ||
        (FLAGS_cache_remote_file_handles && disk_id_ == io_mgr_->RemoteDfsDiskId()) ||
        (FLAGS_cache_s3_file_handles && disk_id_ == io_mgr_->RemoteS3DiskId()) ||
@@ -445,30 +442,29 @@ ScanRange::~ScanRange() {
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
-    const BufferOpts& buffer_opts, void* meta_data) {
-  Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
-      buffer_opts, {}, meta_data);
+    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+    void* meta_data) {
+  Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {},
+      meta_data);
 }
 
 ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
     int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
-    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
-    const BufferOpts& buffer_opts) {
+    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) {
   DCHECK_GE(disk_id, -1);
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
   disk_id =
       ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(file, disk_id, expected_local);
   ScanRange* range = obj_pool->Add(new ScanRange);
-  range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
-      buffer_opts, move(sub_ranges), metadata);
+  range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
+      move(sub_ranges), metadata);
   return range;
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
-    const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
+    int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+    vector<SubRange>&& sub_ranges, void* meta_data) {
   DCHECK(ready_buffers_.empty());
   DCHECK(!read_in_flight_);
   DCHECK(file != nullptr);
@@ -500,10 +496,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
   } else {
     external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
   }
-  // Erasure coded should not be considered local (see IMPALA-7019).
-  DCHECK(!(expected_local && is_erasure_coded));
   expected_local_ = expected_local;
-  is_erasure_coded_ = is_erasure_coded;
   io_mgr_ = nullptr;
   reader_ = nullptr;
   sub_ranges_.clear();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 86e3dbd..24b51e9 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -633,7 +633,7 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
   handle->read_range_ = scan_range_pool_.Add(new ScanRange);
   handle->read_range_->Reset(nullptr, handle->write_range_->file(),
       handle->write_range_->len(), handle->write_range_->offset(),
-      handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME,
+      handle->write_range_->disk_id(), false, ScanRange::INVALID_MTIME,
       BufferOpts::ReadInto(
           read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   read_counter_->Add(1);
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index c28dda3..850c086 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -133,7 +133,6 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
       hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
       hdfs_scan_range.__set_offset(scan_range_offset);
       hdfs_scan_range.__set_partition_id(spec.partition_id);
-      hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
       hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
       TScanRange scan_range;
       scan_range.__set_hdfs_file_split(hdfs_scan_range);
@@ -1110,7 +1109,6 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra
     hdfs_file_split->set_file_compression(
         THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
     hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
-    hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
     hdfs_file_split->set_partition_path_hash(
         tscan_range.hdfs_file_split.partition_path_hash);
   }
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c1a6a4d..e06d1da 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -202,9 +202,6 @@ struct THdfsFileSplit {
   // last modified time of the file
   7: required i64 mtime
 
-  // whether this file is erasure-coded
-  8: required bool is_erasure_coded
-
   // Hash of the partition's path. This must be hashed with a hash algorithm that is
   // consistent across different processes and machines. This is currently using
   // Java's String.hashCode(), which is consistent. For testing purposes, this can use
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0855337..4a3cee6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1024,7 +1024,7 @@ public class HdfsScanNode extends ScanNode {
         scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getRelativePath(),
             currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
-            fileDesc.getIsEc(), partition.getLocation().hashCode()));
+            partition.getLocation().hashCode()));
         TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
         scanRangeLocations.scan_range = scanRange;
         scanRangeLocations.locations = locations;
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py
index be706a1..7d94fd3 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -18,14 +18,13 @@
 import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfLocal, SkipIfEC
+from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import (
     IS_ISILON,
     IS_ADLS)
 from time import sleep
 
 @SkipIfLocal.hdfs_fd_caching
-@SkipIfEC.remote_read
 class TestHdfsFdCaching(CustomClusterTestSuite):
   """Tests that if HDFS file handle caching is enabled, file handles are actually cached
   and the associated metrics return valid results. In addition, tests that the upper bound