You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/10/09 16:18:24 UTC
[impala] 01/02: IMPALA-9485: Enable file handle cache for EC files
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3382759664fe99317f27200b3e52a1e967f0a042
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Oct 1 12:07:21 2020 -0700
IMPALA-9485: Enable file handle cache for EC files
This is essentially a revert of IMPALA-8178. HDFS-14308 added
CanUnbuffer support to the EC input stream APIs in the HDFS client lib.
This patch enables file handle caching for EC files.
Testing:
* Ran core tests against an EC build (ERASURE_CODING=true)
Change-Id: Ieb455eeed02a229a4559d3972dfdac7df32cdb99
Reviewed-on: http://gerrit.cloudera.org:8080/16567
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/base-sequence-scanner.cc | 4 +--
be/src/exec/hdfs-orc-scanner.cc | 3 +--
be/src/exec/hdfs-scan-node-base.cc | 21 +++++++---------
be/src/exec/hdfs-scan-node-base.h | 13 +++-------
be/src/exec/hdfs-scanner.cc | 5 ++--
be/src/exec/hdfs-text-scanner.cc | 3 +--
be/src/exec/parquet/hdfs-parquet-scanner.cc | 2 +-
be/src/exec/parquet/parquet-page-index.cc | 2 +-
be/src/exec/parquet/parquet-page-reader.cc | 3 +--
be/src/exec/scanner-context.cc | 4 +--
be/src/runtime/io/disk-io-mgr-stress.cc | 2 +-
be/src/runtime/io/disk-io-mgr-test.cc | 10 ++++----
be/src/runtime/io/request-ranges.h | 19 +++++---------
be/src/runtime/io/scan-range.cc | 29 ++++++++--------------
be/src/runtime/tmp-file-mgr.cc | 2 +-
be/src/scheduling/scheduler.cc | 2 --
common/thrift/PlanNodes.thrift | 3 ---
.../org/apache/impala/planner/HdfsScanNode.java | 2 +-
tests/custom_cluster/test_hdfs_fd_caching.py | 3 +--
19 files changed, 50 insertions(+), 82 deletions(-)
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 064ef4c..ae66d0e 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -64,8 +64,8 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
BufferOpts::NO_CACHING;
ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
- expected_local, files[i]->is_erasure_coded, files[i]->mtime,
- BufferOpts(cache_options), metadata->original_split);
+ expected_local, files[i]->mtime, BufferOpts(cache_options),
+ metadata->original_split);
ScanRangeMetadata* header_metadata =
static_cast<ScanRangeMetadata*>(header_range->meta_data());
header_metadata->is_sequence_header = true;
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index fe86013..0a706a4 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -116,8 +116,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = scanner_->scan_node_->AllocateScanRange(
metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
- split_range->disk_id(), expected_local, split_range->is_erasure_coded(),
- split_range->mtime(),
+ split_range->disk_id(), expected_local, split_range->mtime(),
BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index c0be811..2dd4ef6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -281,7 +281,6 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
file_desc->mtime = split.mtime();
file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
file_desc->file_format = partition_desc->file_format();
- file_desc->is_erasure_coded = split.is_erasure_coded();
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
native_file_path, &file_desc->fs, &fs_cache));
shared_state_.per_type_files_[partition_desc->file_format()].push_back(file_desc);
@@ -305,8 +304,8 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
- params.volume_id(), expected_local, file_desc->is_erasure_coded,
- file_desc->mtime, BufferOpts(cache_options)));
+ params.volume_id(), expected_local, file_desc->mtime,
+ BufferOpts(cache_options)));
total_splits++;
}
// Update server wide metrics for number of scan ranges and ranges that have
@@ -791,27 +790,26 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- bool is_erasure_coded, int64_t mtime, const BufferOpts& buffer_opts,
- const ScanRange* original_split) {
+ int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) {
ScanRangeMetadata* metadata =
shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
- is_erasure_coded, mtime, buffer_opts);
+ mtime, buffer_opts);
}
ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
- int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
- int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) {
+ int64_t partition_id, int disk_id, bool expected_local, int64_t mtime,
+ const BufferOpts& buffer_opts, const ScanRange* original_split) {
ScanRangeMetadata* metadata =
shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
- disk_id, expected_local, is_erasure_coded, mtime, buffer_opts);
+ disk_id, expected_local, mtime, buffer_opts);
}
ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
- int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
+ int disk_id, bool expected_local, int64_t mtime,
const BufferOpts& buffer_opts) {
// Require that the scan range is within [0, file_length). While this cannot be used
// to guarantee safety (file_length metadata may be stale), it avoids different
@@ -820,8 +818,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int6
DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
<< "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
- move(sub_ranges), metadata, disk_id, expected_local, is_erasure_coded, mtime,
- buffer_opts);
+ move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts);
}
const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 255f3f1..3591a94 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -62,8 +62,7 @@ struct HdfsFileDesc {
file_length(0),
mtime(0),
file_compression(THdfsCompression::NONE),
- file_format(THdfsFileFormat::TEXT),
- is_erasure_coded(false) {}
+ file_format(THdfsFileFormat::TEXT) {}
/// Connection to the filesystem containing the file.
hdfsFS fs;
@@ -81,9 +80,6 @@ struct HdfsFileDesc {
THdfsCompression::type file_compression;
THdfsFileFormat::type file_format;
- /// is erasure coded
- bool is_erasure_coded;
-
/// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
std::vector<io::ScanRange*> splits;
@@ -482,14 +478,13 @@ class HdfsScanNodeBase : public ScanNode {
/// This is thread safe.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- bool is_erasure_coded, int64_t mtime,
- const io::BufferOpts& buffer_opts,
+ int64_t mtime, const io::BufferOpts& buffer_opts,
const io::ScanRange* original_split = nullptr);
/// Same as the first overload, but it takes sub-ranges as well.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
- int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
+ int64_t partition_id, int disk_id, bool expected_local,
int64_t mtime, const io::BufferOpts& buffer_opts,
const io::ScanRange* original_split = nullptr);
@@ -497,7 +492,7 @@ class HdfsScanNodeBase : public ScanNode {
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
ScanRangeMetadata* metadata, int disk_id, bool expected_local,
- bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);
+ int64_t mtime, const io::BufferOpts& buffer_opts);
/// Adds ranges to be read later by scanners. Must not be called once
/// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e471303..028957d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -847,7 +847,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
footer_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), footer_size, footer_start,
split_metadata->partition_id, footer_split->disk_id(),
- footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
+ footer_split->expected_local(), files[i]->mtime,
BufferOpts(footer_split->cache_options()), split);
} else {
// If we did not find the last split, we know it is going to be a remote read.
@@ -857,8 +857,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
footer_range =
scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
footer_size, footer_start, split_metadata->partition_id, -1,
- expected_local, files[i]->is_erasure_coded, files[i]->mtime,
- BufferOpts(cache_options), split);
+ expected_local, files[i]->mtime, BufferOpts(cache_options), split);
}
footer_ranges.push_back(footer_range);
} else {
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f98b6a5..9338662 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -148,8 +148,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), files[i]->file_length, 0,
metadata->partition_id, split->disk_id(), split->expected_local(),
- files[i]->is_erasure_coded, files[i]->mtime,
- BufferOpts(split->cache_options()));
+ files[i]->mtime, BufferOpts(split->cache_options()));
compressed_text_scan_ranges.push_back(file_range);
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
}
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 71bbb4d..297dc17 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1385,7 +1385,7 @@ Status HdfsParquetScanner::ProcessFooter() {
ScanRange* metadata_range = scan_node_->AllocateScanRange(
metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
metadata_range_->disk_id(), metadata_range_->expected_local(),
- metadata_range_->is_erasure_coded(), metadata_range_->mtime(),
+ metadata_range_->mtime(),
BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size, cache_options));
unique_ptr<BufferDescriptor> io_buffer;
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index 6c9b2bf..8e63b4f 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -99,7 +99,7 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
scan_range_start, move(sub_ranges), partition_id,
scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
- scanner_->metadata_range_->is_erasure_coded(), scanner_->metadata_range_->mtime(),
+ scanner_->metadata_range_->mtime(),
BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
cache_options));
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index 7765feb..d4a4d4e 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -92,8 +92,7 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
filename(), col_len, col_start, move(sub_ranges),
partition_id, split_range->disk_id(),
- col_range_local, split_range->is_erasure_coded(), file_desc.mtime,
- BufferOpts(split_range->cache_options()));
+ col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options()));
page_headers_read_ = 0;
dictionary_header_encountered_ = false;
state_ = State::Initialized;
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a7fbf7a..aeedefe 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -159,8 +159,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
- scan_range_->disk_id(), expected_local, scan_range_->is_erasure_coded(),
- scan_range_->mtime(), BufferOpts(cache_options));
+ scan_range_->disk_id(), expected_local, scan_range_->mtime(),
+ BufferOpts(cache_options));
bool needs_buffers;
RETURN_IF_ERROR(
parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 96e58b6..afe65e3 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -262,7 +262,7 @@ void DiskIoMgrStress::NewClient(int i) {
ScanRange* range = client.obj_pool.Add(new ScanRange);
range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
- 0, false, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
+ 0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index c471256..96291fb 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -111,7 +111,7 @@ class DiskIoMgrTest : public testing::Test {
if (status.ok()) {
ScanRange* scan_range = pool_.Add(new ScanRange());
scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
- (*written_range)->offset(), 0, false, false, ScanRange::INVALID_MTIME,
+ (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME,
BufferOpts::Uncached());
ValidateSyncRead(io_mgr, reader, client, scan_range,
reinterpret_cast<const char*>(data), sizeof(int32_t));
@@ -243,7 +243,7 @@ class DiskIoMgrTest : public testing::Test {
ScanRange* range = pool->Add(new ScanRange);
int cache_options =
is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
- range->Reset(nullptr, file_path, len, offset, disk_id, true, false, mtime,
+ range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime,
BufferOpts(cache_options), move(sub_ranges), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
@@ -1449,7 +1449,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
ScanRange* range = pool_.Add(new ScanRange);
- range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false, ScanRange::INVALID_MTIME,
+ range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1496,7 +1496,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
vector<uint8_t> client_buffer(result_len);
ScanRange* range = pool_.Add(new ScanRange);
int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
- range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false, stat_val.st_mtime,
+ range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime,
BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
move(sub_ranges));
if (fake_cache) {
@@ -1546,7 +1546,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
ScanRange* range = pool_.Add(new ScanRange);
- range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false, ScanRange::INVALID_MTIME,
+ range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 1c2a38a..817151e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -252,8 +252,7 @@ class ScanRange : public RequestRange {
/// with the rest of the input variables.
static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
- int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts);
+ int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts);
/// Resets this scan range object with the scan range description. The scan range
/// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
@@ -261,8 +260,7 @@ class ScanRange : public RequestRange {
/// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
/// queue to add the range to. If 'expected_local' is true, a warning is generated if
/// the read did not come from a local disk. 'mtime' is the last modification time for
- /// 'file'; the mtime must change when the file changes. 'is_erasure_coded' is whether
- /// 'file' is stored using HDFS erasure coding.
+ /// 'file'; the mtime must change when the file changes.
/// 'buffer_opts' specifies buffer management options - see the DiskIoMgr class comment
/// and the BufferOpts comments for details.
/// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
@@ -271,15 +269,14 @@ class ScanRange : public RequestRange {
/// it is not generally safe to do so, but some unit tests reuse ranges after
/// successfully reading to eos.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
- bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts, void* meta_data = nullptr);
+ bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+ void* meta_data = nullptr);
/// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
/// in advance, as this method will do the merge.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
- bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts, std::vector<SubRange>&& sub_ranges,
- void* meta_data = nullptr);
+ bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+ std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
void* meta_data() const { return meta_data_; }
int cache_options() const { return cache_options_; }
@@ -287,7 +284,6 @@ class ScanRange : public RequestRange {
bool UseDataCache() const { return (cache_options_ & BufferOpts::USE_DATA_CACHE) != 0; }
bool read_in_flight() const { return read_in_flight_; }
bool expected_local() const { return expected_local_; }
- bool is_erasure_coded() const { return is_erasure_coded_; }
int64_t bytes_to_read() const { return bytes_to_read_; }
/// Returns the next buffer for this scan range. buffer is an output parameter.
@@ -475,9 +471,6 @@ class ScanRange : public RequestRange {
/// TODO: we can do more with this
bool expected_local_ = false;
- /// If true, the file associated with the scan range is erasure coded. Set in Reset().
- bool is_erasure_coded_ = false;
-
/// Last modified time of the file associated with the scan range. Set in Reset().
int64_t mtime_;
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index b8e2cfa..9eeb788 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -192,13 +192,10 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
// lock across the read call.
// To use the file handle cache:
// 1. It must be enabled at the daemon level.
- // 2. The file cannot be erasure coded.
- // 3. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
+ // 2. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
// 'cache_remote_file_handles' is true
- // Note: S3, ADLS, and ABFS file handles are not cached. Erasure coded HDFS files
- // are also not cached (IMPALA-8178), due to excessive memory usage (see HDFS-14308).
bool use_file_handle_cache = false;
- if (is_file_handle_caching_enabled() && !is_erasure_coded_ &&
+ if (is_file_handle_caching_enabled() &&
(expected_local_ ||
(FLAGS_cache_remote_file_handles && disk_id_ == io_mgr_->RemoteDfsDiskId()) ||
(FLAGS_cache_s3_file_handles && disk_id_ == io_mgr_->RemoteS3DiskId()) ||
@@ -445,30 +442,29 @@ ScanRange::~ScanRange() {
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
- int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts, void* meta_data) {
- Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
- buffer_opts, {}, meta_data);
+ int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+ void* meta_data) {
+ Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {},
+ meta_data);
}
ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
- int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts) {
+ int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) {
DCHECK_GE(disk_id, -1);
DCHECK_GE(offset, 0);
DCHECK_GE(len, 0);
disk_id =
ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(file, disk_id, expected_local);
ScanRange* range = obj_pool->Add(new ScanRange);
- range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
- buffer_opts, move(sub_ranges), metadata);
+ range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
+ move(sub_ranges), metadata);
return range;
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
- int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
- const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
+ int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+ vector<SubRange>&& sub_ranges, void* meta_data) {
DCHECK(ready_buffers_.empty());
DCHECK(!read_in_flight_);
DCHECK(file != nullptr);
@@ -500,10 +496,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
} else {
external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
}
- // Erasure coded should not be considered local (see IMPALA-7019).
- DCHECK(!(expected_local && is_erasure_coded));
expected_local_ = expected_local;
- is_erasure_coded_ = is_erasure_coded;
io_mgr_ = nullptr;
reader_ = nullptr;
sub_ranges_.clear();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 86e3dbd..24b51e9 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -633,7 +633,7 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
handle->read_range_ = scan_range_pool_.Add(new ScanRange);
handle->read_range_->Reset(nullptr, handle->write_range_->file(),
handle->write_range_->len(), handle->write_range_->offset(),
- handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME,
+ handle->write_range_->disk_id(), false, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(
read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
read_counter_->Add(1);
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index c28dda3..850c086 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -133,7 +133,6 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
hdfs_scan_range.__set_offset(scan_range_offset);
hdfs_scan_range.__set_partition_id(spec.partition_id);
- hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
TScanRange scan_range;
scan_range.__set_hdfs_file_split(hdfs_scan_range);
@@ -1110,7 +1109,6 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra
hdfs_file_split->set_file_compression(
THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
- hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
hdfs_file_split->set_partition_path_hash(
tscan_range.hdfs_file_split.partition_path_hash);
}
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c1a6a4d..e06d1da 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -202,9 +202,6 @@ struct THdfsFileSplit {
// last modified time of the file
7: required i64 mtime
- // whether this file is erasure-coded
- 8: required bool is_erasure_coded
-
// Hash of the partition's path. This must be hashed with a hash algorithm that is
// consistent across different processes and machines. This is currently using
// Java's String.hashCode(), which is consistent. For testing purposes, this can use
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0855337..4a3cee6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1024,7 +1024,7 @@ public class HdfsScanNode extends ScanNode {
scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getRelativePath(),
currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
- fileDesc.getIsEc(), partition.getLocation().hashCode()));
+ partition.getLocation().hashCode()));
TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
scanRangeLocations.scan_range = scanRange;
scanRangeLocations.locations = locations;
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py
index be706a1..7d94fd3 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -18,14 +18,13 @@
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfLocal, SkipIfEC
+from tests.common.skip import SkipIfLocal
from tests.util.filesystem_utils import (
IS_ISILON,
IS_ADLS)
from time import sleep
@SkipIfLocal.hdfs_fd_caching
-@SkipIfEC.remote_read
class TestHdfsFdCaching(CustomClusterTestSuite):
"""Tests that if HDFS file handle caching is enabled, file handles are actually cached
and the associated metrics return valid results. In addition, tests that the upper bound