You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2022/11/10 00:36:55 UTC
[impala] 01/02: IMPALA-9488: Add metrics for EC reads
This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4e87a80fae86c5775a5f35607806fee287b3797a
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Fri Oct 28 10:57:48 2022 -0700
IMPALA-9488: Add metrics for EC reads
Adds metric tracking erasure-coded bytes read. Adds ScanRange::TestInfo
to pass file info through calls of AllocateScanRange so it's easier to
add erasure coding as another piece of file info.
Adds a test to verify that the expected number of bytes are read for
existing read metrics and the new `erasure-coded-bytes-read` metric when
doing a select.
Change-Id: Ieb06bac9dea4b632621653d2935e9a7b2dc81341
Reviewed-on: http://gerrit.cloudera.org:8080/19178
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/base-sequence-scanner.cc | 7 +-
be/src/exec/hdfs-scan-node-base.cc | 42 +++++++-----
be/src/exec/hdfs-scan-node-base.h | 22 +++---
be/src/exec/hdfs-scanner.cc | 13 ++--
be/src/exec/orc/hdfs-orc-scanner.cc | 13 ++--
be/src/exec/parquet/hdfs-parquet-scanner.cc | 4 +-
be/src/exec/parquet/parquet-page-index.cc | 10 ++-
be/src/exec/parquet/parquet-page-reader.cc | 9 +--
be/src/exec/scanner-context.cc | 5 +-
be/src/exec/text/hdfs-text-scanner.cc | 7 +-
be/src/runtime/io/disk-io-mgr-stress.cc | 4 +-
be/src/runtime/io/disk-io-mgr-test.cc | 36 +++++-----
be/src/runtime/io/hdfs-file-reader.cc | 3 +
be/src/runtime/io/request-context.h | 4 ++
be/src/runtime/io/request-ranges.h | 34 +++++++---
be/src/runtime/io/scan-range.cc | 38 +++++------
be/src/runtime/tmp-file-mgr.cc | 10 +--
be/src/scheduling/scheduler.cc | 2 +
be/src/util/impalad-metrics.cc | 5 ++
be/src/util/impalad-metrics.h | 4 ++
common/thrift/PlanNodes.thrift | 3 +
common/thrift/metrics.json | 10 +++
.../org/apache/impala/planner/HdfsScanNode.java | 1 +
tests/query_test/test_io_metrics.py | 79 ++++++++++++++++++++++
24 files changed, 254 insertions(+), 111 deletions(-)
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index ae66d0e01..83e1221b1 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -62,10 +62,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
bool expected_local = false;
int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE :
BufferOpts::NO_CACHING;
- ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
- files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
- expected_local, files[i]->mtime, BufferOpts(cache_options),
- metadata->original_split);
+ ScanRange* header_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+ header_size, 0, metadata->partition_id, -1, expected_local,
+ BufferOpts(cache_options), metadata->original_split);
ScanRangeMetadata* header_metadata =
static_cast<ScanRangeMetadata*>(header_range->meta_data());
header_metadata->is_sequence_header = true;
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b5bab2c13..f6f1f008d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -111,6 +111,8 @@ PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES,
"The total number of bytes read via short circuit read");
PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES,
"The total number of bytes read from data node cache");
+PROFILE_DEFINE_COUNTER(BytesReadErasureCoded, STABLE_LOW, TUnit::BYTES,
+ "The total number of bytes read from erasure-coded data");
PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT,
"The total number of remote scan ranges");
PROFILE_DEFINE_COUNTER(BytesReadRemoteUnexpected, STABLE_LOW, TUnit::BYTES,
@@ -298,6 +300,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
file_desc->file_length = split.file_length();
file_desc->mtime = split.mtime();
file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
+ file_desc->is_erasure_coded = split.is_erasure_coded();
file_desc->file_format = partition_desc->file_format();
file_desc->file_metadata = file_metadata;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
@@ -328,10 +331,9 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
}
ScanRangeMetadata* metadata =
obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
- file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
- file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
- params.volume_id(), expected_local, file_desc->mtime,
- BufferOpts(cache_options)));
+ file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool,
+ file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata,
+ params.volume_id(), expected_local, BufferOpts(cache_options)));
total_splits++;
}
// Update server wide metrics for number of scan ranges and ranges that have
@@ -608,6 +610,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
bytes_read_short_circuit_ =
PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile());
bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile());
+ bytes_read_ec_ = PROFILE_BytesReadErasureCoded.Instantiate(runtime_profile());
num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile());
unexpected_remote_bytes_ =
PROFILE_BytesReadRemoteUnexpected.Instantiate(runtime_profile());
@@ -822,37 +825,37 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
return curr_reservation;
}
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) {
+ const BufferOpts& buffer_opts, const ScanRange* original_split) {
ScanRangeMetadata* metadata =
shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
- return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
- mtime, buffer_opts);
+ return AllocateScanRange(fi, len, offset, {}, metadata, disk_id, expected_local,
+ buffer_opts);
}
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
- int64_t partition_id, int disk_id, bool expected_local, int64_t mtime,
+ int64_t partition_id, int disk_id, bool expected_local,
const BufferOpts& buffer_opts, const ScanRange* original_split) {
ScanRangeMetadata* metadata =
shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
- return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
- disk_id, expected_local, mtime, buffer_opts);
+ return AllocateScanRange(fi, len, offset, move(sub_ranges), metadata,
+ disk_id, expected_local, buffer_opts);
}
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
- int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
- int disk_id, bool expected_local, int64_t mtime,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi,
+ int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
+ ScanRangeMetadata* metadata, int disk_id, bool expected_local,
const BufferOpts& buffer_opts) {
// Require that the scan range is within [0, file_length). While this cannot be used
// to guarantee safety (file_length metadata may be stale), it avoids different
// behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
// beyond the end of the file).
- DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
+ DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, fi.filename)->file_length)
<< "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
- return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
- move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts);
+ return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fi, len, offset,
+ move(sub_ranges), metadata, disk_id, expected_local, buffer_opts);
}
const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
@@ -1213,6 +1216,7 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
bytes_read_local_->Set(reader_context_->bytes_read_local());
bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+ bytes_read_ec_->Set(reader_context_->bytes_read_ec());
num_remote_ranges_->Set(reader_context_->num_remote_ranges());
unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
@@ -1237,6 +1241,8 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
bytes_read_short_circuit_->value());
ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
bytes_read_dn_cache_->value());
+ ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ->Increment(
+ bytes_read_ec_->value());
}
}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 21d18f25c..053e61c70 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -71,6 +71,10 @@ struct HdfsFileDesc {
file_compression(THdfsCompression::NONE),
file_format(THdfsFileFormat::TEXT) {}
+ io::ScanRange::FileInfo GetFileInfo() const {
+ return io::ScanRange::FileInfo{filename.c_str(), fs, mtime, is_erasure_coded};
+ }
+
/// Connection to the filesystem containing the file.
hdfsFS fs;
@@ -93,6 +97,9 @@ struct HdfsFileDesc {
/// Extra file metadata, e.g. Iceberg-related file-level info.
const ::org::apache::impala::fb::FbFileMetadata* file_metadata;
+ /// Whether file is erasure coded.
+ bool is_erasure_coded = false;
+
/// Some useful typedefs for creating HdfsFileDesc related data structures.
/// This is a pair for partition ID and filename which uniquely identifies a file.
typedef pair<int64_t, std::string> PartitionFileKey;
@@ -506,23 +513,21 @@ class HdfsScanNodeBase : public ScanNode {
/// If not NULL, the 'original_split' pointer is stored for reference in the scan range
/// metadata of the scan range that is to be allocated.
/// This is thread safe.
- io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+ io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- int64_t mtime, const io::BufferOpts& buffer_opts,
- const io::ScanRange* original_split = nullptr);
+ const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
/// Same as the first overload, but it takes sub-ranges as well.
- io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+ io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
int64_t partition_id, int disk_id, bool expected_local,
- int64_t mtime, const io::BufferOpts& buffer_opts,
- const io::ScanRange* original_split = nullptr);
+ const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr);
/// Same as above, but it takes both sub-ranges and metadata.
- io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+ io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len,
int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
ScanRangeMetadata* metadata, int disk_id, bool expected_local,
- int64_t mtime, const io::BufferOpts& buffer_opts);
+ const io::BufferOpts& buffer_opts);
/// Adds ranges to be read later by scanners. Must not be called once
/// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
@@ -781,6 +786,7 @@ class HdfsScanNodeBase : public ScanNode {
RuntimeProfile::Counter* bytes_read_local_ = nullptr;
RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
+ RuntimeProfile::Counter* bytes_read_ec_ = nullptr;
RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 7795a8c09..49f360ed1 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -888,10 +888,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
// metadata associated with the footer range.
ScanRange* footer_range;
if (footer_split != nullptr) {
- footer_range = scan_node->AllocateScanRange(files[i]->fs,
- files[i]->filename.c_str(), footer_size, footer_start,
- split_metadata->partition_id, footer_split->disk_id(),
- footer_split->expected_local(), files[i]->mtime,
+ footer_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+ footer_size, footer_start, split_metadata->partition_id,
+ footer_split->disk_id(), footer_split->expected_local(),
BufferOpts(footer_split->cache_options()), split);
} else {
// If we did not find the last split, we know it is going to be a remote read.
@@ -899,9 +898,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
int cache_options = !scan_node->IsDataCacheDisabled() ?
BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING;
footer_range =
- scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
- footer_size, footer_start, split_metadata->partition_id, -1,
- expected_local, files[i]->mtime, BufferOpts(cache_options), split);
+ scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+ footer_size, footer_start, split_metadata->partition_id, -1,
+ expected_local, BufferOpts(cache_options), split);
}
footer_ranges.push_back(footer_range);
} else {
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index ce04bf178..12185af5b 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -155,8 +155,9 @@ Status HdfsOrcScanner::ScanRangeInputStream::readRandom(
bool expected_local = split_range->ExpectedLocalRead(offset, length);
int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = scanner_->scan_node_->AllocateScanRange(
- metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
- split_range->disk_id(), expected_local, split_range->mtime(),
+ ScanRange::FileInfo{scanner_->filename(), metadata_range->fs(),
+ split_range->mtime(), split_range->is_erasure_coded()},
+ length, offset, partition_id, split_range->disk_id(), expected_local,
BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
unique_ptr<BufferDescriptor> io_buffer;
Status status;
@@ -255,9 +256,11 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe)
string msg = Substitute("Invalid read len.");
return Status(msg);
}
- ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
- filename(), range.length_, range.offset_, partition_id, split_range->disk_id(),
- col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options()));
+ ScanRange* scan_range = scan_node_->AllocateScanRange(
+ ScanRange::FileInfo{filename(), metadata_range_->fs(), split_range->mtime(),
+ split_range->is_erasure_coded()},
+ range.length_, range.offset_, partition_id, split_range->disk_id(),
+ col_range_local, BufferOpts(split_range->cache_options()));
RETURN_IF_ERROR(
context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_));
}
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 64e4f546e..df70ceb2e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1913,10 +1913,8 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
const int cache_options =
metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* object_range = scan_node_->AllocateScanRange(
- metadata_range_->fs(), filename(), size,
- offset, partition_id,
+ metadata_range_->GetFileInfo(), size, offset, partition_id,
metadata_range_->disk_id(), metadata_range_->expected_local(),
- metadata_range_->mtime(),
BufferOpts::ReadInto(buffer, size, cache_options));
unique_ptr<BufferDescriptor> io_buffer;
bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index e46edfa65..9925714ac 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -96,12 +96,10 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
int cache_options =
scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* object_range = scanner_->scan_node_->AllocateScanRange(
- scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
- scan_range_start, move(sub_ranges), partition_id,
- scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
- scanner_->metadata_range_->mtime(),
- BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
- cache_options));
+ scanner_->metadata_range_->GetFileInfo(), scan_range_size, scan_range_start,
+ move(sub_ranges), partition_id, scanner_->metadata_range_->disk_id(),
+ scanner_->metadata_range_->expected_local(), BufferOpts::ReadInto(
+ page_index_buffer_.buffer(), page_index_buffer_.Size(), cache_options));
unique_ptr<BufferDescriptor> io_buffer;
bool needs_buffers;
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index 972b326aa..3caa3ebda 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -87,10 +87,11 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
// Determine if the column is completely contained within a local split.
bool col_range_local = split_range->ExpectedLocalRead(col_start, col_len);
- scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
- filename(), col_len, col_start, move(sub_ranges),
- partition_id, split_range->disk_id(),
- col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options()));
+ ScanRange::FileInfo fi = metadata_range->GetFileInfo();
+ fi.mtime = file_desc.mtime;
+ scan_range_ = parent_->scan_node_->AllocateScanRange(fi,
+ col_len, col_start, move(sub_ranges), partition_id, split_range->disk_id(),
+ col_range_local, BufferOpts(split_range->cache_options()));
page_headers_read_ = 0;
dictionary_header_encountered_ = false;
state_ = State::Initialized;
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a9ac2061d..2e453dc96 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -172,9 +172,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
// Disable HDFS caching as we are reading past the end.
int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = parent_->scan_node_->AllocateScanRange(
- scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
- scan_range_->disk_id(), expected_local, scan_range_->mtime(),
- BufferOpts(cache_options));
+ scan_range_->GetFileInfo(), read_past_buffer_size, offset, partition_id,
+ scan_range_->disk_id(), expected_local, BufferOpts(cache_options));
bool needs_buffers;
RETURN_IF_ERROR(
parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc
index f36b21c67..0aae4faaf 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -145,10 +145,9 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
DCHECK_GT(files[i]->file_length, 0);
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
- ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
- files[i]->filename.c_str(), files[i]->file_length, 0,
- metadata->partition_id, split->disk_id(), split->expected_local(),
- files[i]->mtime, BufferOpts(split->cache_options()));
+ ScanRange* file_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
+ files[i]->file_length, 0, metadata->partition_id, split->disk_id(),
+ split->expected_local(), BufferOpts(split->cache_options()));
compressed_text_scan_ranges.push_back(file_range);
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
}
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index afe65e39d..173899edc 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -261,8 +261,8 @@ void DiskIoMgrStress::NewClient(int i) {
range_len = min(range_len, file_len - assigned_len);
ScanRange* range = client.obj_pool.Add(new ScanRange);
- range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
- 0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
+ range->Reset(ScanRange::FileInfo{files_[client.file_idx].filename.c_str()},
+ range_len, assigned_len, 0, false, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 52741f0f8..1e5f9240b 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -131,8 +131,8 @@ class DiskIoMgrTest : public testing::Test {
}
if (status.ok()) {
ScanRange* scan_range = pool_.Add(new ScanRange());
- scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
- (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME,
+ scan_range->Reset(ScanRange::FileInfo{(*written_range)->file()},
+ (*written_range)->len(), (*written_range)->offset(), 0, false,
BufferOpts::Uncached());
ValidateSyncRead(io_mgr, reader, client, scan_range,
reinterpret_cast<const char*>(data), sizeof(int32_t));
@@ -300,8 +300,8 @@ class DiskIoMgrTest : public testing::Test {
ScanRange* range = pool->Add(new ScanRange);
int cache_options =
is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
- range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime,
- BufferOpts(cache_options), move(sub_ranges), meta_data);
+ range->Reset(ScanRange::FileInfo{file_path, nullptr, mtime}, len, offset, disk_id,
+ true, BufferOpts(cache_options), move(sub_ranges), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
@@ -1591,7 +1591,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
ScanRange* range = pool_.Add(new ScanRange);
- range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME,
+ range->Reset(ScanRange::FileInfo{tmp_file}, scan_len, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -1638,8 +1638,8 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
vector<uint8_t> client_buffer(result_len);
ScanRange* range = pool_.Add(new ScanRange);
int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING;
- range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime,
- BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
+ range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime}, data_len, 0,
+ 0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len),
move(sub_ranges));
if (fake_cache) {
SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
@@ -1688,7 +1688,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
ScanRange* range = pool_.Add(new ScanRange);
- range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME,
+ range->Reset(ScanRange::FileInfo{tmp_file}, SCAN_LEN, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
@@ -2059,8 +2059,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
auto data = datas.at(i);
size_t buffer_len = sizeof(int32_t);
vector<uint8_t> client_buffer(buffer_len);
- scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
- range->offset(), 0, false, mtime,
+ scan_range->Reset(
+ ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+ range->len(), range->offset(), 0, false,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
bool needs_buffers;
@@ -2085,8 +2086,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
auto data = datas.at(i);
size_t buffer_len = sizeof(int32_t);
vector<uint8_t> client_buffer(buffer_len);
- scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
- range->offset(), 0, false, mtime,
+ scan_range->Reset(
+ ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime},
+ range->len(), range->offset(), 0, false,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
bool needs_buffers;
@@ -2169,8 +2171,9 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
ScanRange* scan_range = tmp_pool.Add(new ScanRange);
size_t buffer_len = sizeof(int32_t);
vector<uint8_t> client_buffer(buffer_len);
- scan_range->Reset(hdfsConnect("default", 0), (*new_range)->file(), (*new_range)->len(),
- (*new_range)->offset(), 0, false, mtime,
+ scan_range->Reset(
+ ScanRange::FileInfo{(*new_range)->file(), hdfsConnect("default", 0), mtime},
+ (*new_range)->len(), (*new_range)->offset(), 0, false,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile());
bool needs_buffers;
@@ -2588,8 +2591,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
auto range = ranges.at(0);
size_t buffer_len = sizeof(int32_t);
vector<uint8_t> client_buffer(buffer_len);
- scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
- range->offset(), 0, false, 1000000,
+ scan_range->Reset(
+ ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), 1000000},
+ range->len(), range->offset(), 0, false,
BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile());
bool needs_buffers;
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 0b98441a2..9dfef6aae 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -412,6 +412,9 @@ void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats) {
scan_range_->reader_->bytes_read_short_circuit_.Add(
stats->totalShortCircuitBytesRead);
scan_range_->reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
+ if (scan_range_->is_erasure_coded()) {
+ scan_range_->reader_->bytes_read_ec_.Add(stats->totalBytesRead);
+ }
if (stats->totalLocalBytesRead != stats->totalBytesRead) {
num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
}
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 96d609d93..646908374 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -158,6 +158,7 @@ class RequestContext {
int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
+ int64_t bytes_read_ec() const { return bytes_read_ec_.Load(); }
int num_remote_ranges() const { return num_remote_ranges_.Load(); }
int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); }
@@ -399,6 +400,9 @@ class RequestContext {
/// Total number of bytes read from date node cache, updated at end of each range scan
AtomicInt64 bytes_read_dn_cache_{0};
+ /// Total number of erasure-coded bytes read, updated at end of each range scan
+ AtomicInt64 bytes_read_ec_{0};
+
/// Total number of bytes from remote reads that were expected to be local.
AtomicInt64 unexpected_remote_bytes_{0};
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index ff3970e49..5121d6acc 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -146,6 +146,7 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
int64_t offset() const { return offset_; }
int64_t len() const { return len_; }
int disk_id() const { return disk_id_; }
+ bool is_erasure_coded() const { return is_erasure_coded_; }
RequestType::type request_type() const { return request_type_; }
protected:
@@ -171,6 +172,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
/// Id of disk queue containing byte range.
int disk_id_;
+ /// Whether file is erasure coded.
+ bool is_erasure_coded_;
+
/// The type of IO request, READ or WRITE.
RequestType::type request_type_;
};
@@ -261,11 +265,26 @@ class ScanRange : public RequestRange {
int64_t length;
};
+ /// Struct for passing file info for constructing ScanRanges. Only contains details
+ /// consistent across all ranges for a given file. Filename is only used for the
+ /// duration of calls accepting FileInfo.
+ struct FileInfo {
+ const char *filename;
+ hdfsFS fs = nullptr;
+ int64_t mtime = ScanRange::INVALID_MTIME;
+ bool is_erasure_coded = false;
+ };
+
/// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it
/// with the rest of the input variables.
- static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+ static ScanRange* AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
- int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts);
+ int disk_id, bool expected_local, const BufferOpts& buffer_opts);
+
+ /// Get file info for the current scan range.
+ FileInfo GetFileInfo() const {
+ return FileInfo{file_.c_str(), fs_, mtime_, is_erasure_coded_};
+ }
/// Resets this scan range object with the scan range description. The scan range
/// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
@@ -285,15 +304,14 @@ class ScanRange : public RequestRange {
/// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently
/// it is not generally safe to do so, but some unit tests reuse ranges after
/// successfully reading to eos.
- void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
- bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
- void* meta_data = nullptr, DiskFile* disk_file = nullptr,
- DiskFile* disk_buffer_file = nullptr);
+ void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+ bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr,
+ DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
/// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
/// in advance, as this method will do the merge.
- void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
- bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
+ void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+ bool expected_local, const BufferOpts& buffer_opts,
std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr,
DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr);
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1851f3a1b..57d67bbfa 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -466,57 +466,57 @@ ScanRange::~ScanRange() {
DCHECK(!read_in_flight_);
}
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
- int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
- void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
- Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {}, meta_data,
- disk_file, disk_buffer_file);
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+ bool expected_local, const BufferOpts& buffer_opts, void* meta_data,
+ DiskFile* disk_file, DiskFile* disk_buffer_file) {
+ Reset(fi, len, offset, disk_id, expected_local, buffer_opts, {},
+ meta_data, disk_file, disk_buffer_file);
}
-ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi,
int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
- int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) {
+ int disk_id, bool expected_local, const BufferOpts& buffer_opts) {
DCHECK_GE(disk_id, -1);
DCHECK_GE(offset, 0);
DCHECK_GE(len, 0);
disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
- file, disk_id, expected_local, /* check_default_fs */ true);
+ fi.filename, disk_id, expected_local, /* check_default_fs */ true);
ScanRange* range = obj_pool->Add(new ScanRange);
- range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
+ range->Reset(fi, len, offset, disk_id, expected_local, buffer_opts,
move(sub_ranges), metadata);
return range;
}
-void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
- int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts,
- vector<SubRange>&& sub_ranges, void* meta_data, DiskFile* disk_file,
- DiskFile* disk_buffer_file) {
+void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id,
+ bool expected_local, const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges,
+ void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) {
DCHECK(buffer_manager_->is_readybuffer_empty());
DCHECK(!read_in_flight_);
- DCHECK(file != nullptr);
+ DCHECK(fi.filename != nullptr);
DCHECK_GE(len, 0);
DCHECK_GE(offset, 0);
DCHECK(buffer_opts.client_buffer_ == nullptr ||
buffer_opts.client_buffer_len_ >= len_);
- fs_ = fs;
- if (fs != nullptr) {
+ fs_ = fi.fs;
+ if (fs_ != nullptr) {
file_reader_ = make_unique<HdfsFileReader>(this, fs_, false);
local_buffer_reader_ = make_unique<LocalFileReader>(this);
} else {
file_reader_ = make_unique<LocalFileReader>(this);
}
- file_ = file;
+ file_ = fi.filename;
len_ = len;
bytes_to_read_ = len;
offset_ = offset;
disk_id_ = disk_id;
+ is_erasure_coded_ = fi.is_erasure_coded;
cache_options_ = buffer_opts.cache_options_;
disk_file_ = disk_file;
disk_buffer_file_ = disk_buffer_file;
// HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
- if (fs_) DCHECK_GT(mtime, 0);
- mtime_ = mtime;
+ mtime_ = fi.mtime;
+ if (fs_) DCHECK_GT(mtime_, 0);
meta_data_ = meta_data;
if (buffer_opts.client_buffer_ != nullptr) {
buffer_manager_->set_client_buffer();
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index be84ca420..8a90cf614 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -1629,16 +1629,18 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset);
DiskFile* remote_file = tmp_file->DiskFile();
// Reset the read_range, use the remote filesystem's disk id.
- handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(),
- handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_,
+ handle->read_range_->Reset(
+ ScanRange::FileInfo{
+ remote_file->path().c_str(), tmp_file->hdfs_conn_, tmp_file->mtime_},
+ handle->write_range_->len(), offset, tmp_file->disk_id(), false,
BufferOpts::ReadInto(
read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING),
nullptr, remote_file, local_read_buffer_file);
} else {
// Read from local.
- handle->read_range_->Reset(nullptr, handle->write_range_->file(),
+ handle->read_range_->Reset(
+ ScanRange::FileInfo{handle->write_range_->file()},
handle->write_range_->len(), offset, handle->write_range_->disk_id(), false,
- ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(
read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
}
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5afaf73c6..67ea5414c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -134,6 +134,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
hdfs_scan_range.__set_offset(scan_range_offset);
hdfs_scan_range.__set_partition_id(spec.partition_id);
hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
+ hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
if (fb_desc->absolute_path() != nullptr) {
hdfs_scan_range.__set_absolute_path(fb_desc->absolute_path()->str());
}
@@ -1123,6 +1124,7 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra
hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
hdfs_file_split->set_partition_path_hash(
tscan_range.hdfs_file_split.partition_path_hash);
+ hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
if (tscan_range.hdfs_file_split.__isset.absolute_path) {
hdfs_file_split->set_absolute_path(
tscan_range.hdfs_file_split.absolute_path);
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index e57062fcc..6a2e07402 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -59,6 +59,8 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ =
"impala-server.io-mgr.short-circuit-bytes-read";
const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ =
"impala-server.io-mgr.cached-bytes-read";
+const char* ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ =
+ "impala-server.io-mgr.erasure-coded-bytes-read";
const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES =
"impala-server.io-mgr.remote-data-cache-hit-bytes";
const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT =
@@ -167,6 +169,7 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = nullptr;
IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = nullptr;
@@ -341,6 +344,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter(
ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
+ IO_MGR_ERASURE_CODED_BYTES_READ = IO_MGR_METRICS->AddCounter(
+ ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ, 0);
IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter(
ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
IO_MGR_BYTES_WRITTEN = IO_MGR_METRICS->AddCounter(
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index a3811f7b3..c43dca69f 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -76,6 +76,9 @@ class ImpaladMetricKeys {
/// Total number of cached bytes read by the io mgr
static const char* IO_MGR_CACHED_BYTES_READ;
+ /// Total number of erasure-coded bytes read by the io mgr
+ static const char* IO_MGR_ERASURE_CODED_BYTES_READ;
+
/// Total number of bytes read from the remote data cache.
static const char* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
@@ -258,6 +261,7 @@ class ImpaladMetrics {
static IntCounter* IO_MGR_BYTES_READ;
static IntCounter* IO_MGR_LOCAL_BYTES_READ;
static IntCounter* IO_MGR_CACHED_BYTES_READ;
+ static IntCounter* IO_MGR_ERASURE_CODED_BYTES_READ;
static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES;
static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT;
static IntCounter* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 46e8a5a30..d76a25fa6 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -227,6 +227,9 @@ struct THdfsFileSplit {
// The absolute path of the file, it's used only when data files are outside of
// the Iceberg table location (IMPALA-11507).
10: optional string absolute_path
+
+ // Whether the HDFS file is stored with erasure coding.
+ 11: optional bool is_erasure_coded
}
// Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index a5abd6651..6e5bca495 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -409,6 +409,16 @@
"kind": "COUNTER",
"key": "impala-server.io-mgr.cached-bytes-read"
},
+ {
+ "description": "Total number of erasure-coded bytes read by the IO manager.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Impala Server Io Mgr Erasure Coded Bytes Read",
+ "units": "BYTES",
+ "kind": "COUNTER",
+ "key": "impala-server.io-mgr.erasure-coded-bytes-read"
+ },
{
"description": "Total number of local bytes read by the IO manager.",
"contexts": [
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 1958595ad..35cdddc4c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1424,6 +1424,7 @@ public class HdfsScanNode extends ScanNode {
fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
partition.getLocation().hashCode());
hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath());
+ hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
scanRange.setHdfs_file_split(hdfsFileSplit);
if (fileDesc.getFbFileMetadata() != null) {
scanRange.setFile_metadata(fileDesc.getFbFileMetadata().getByteBuffer());
diff --git a/tests/query_test/test_io_metrics.py b/tests/query_test/test_io_metrics.py
new file mode 100644
index 000000000..f4e4d1ef4
--- /dev/null
+++ b/tests/query_test/test_io_metrics.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.util.filesystem_utils import IS_EC, IS_HDFS
+
+
+class TestIOMetrics(ImpalaTestSuite):
+ @classmethod
+ def get_workload(self):
+ return 'tpch'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestIOMetrics, cls).add_test_dimensions()
+ # Run with num_nodes=1 to make it easy to verify metric changes.
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension(num_nodes=1))
+
+ # Issue a local query and test that read metrics are updated.
+ @pytest.mark.execute_serially
+ def test_local_read(self, vector):
+ # Accumulate metrics that are expected to update from a read, and metrics that are
+ # expected not to change with this configuration. Metrics that shouldn't change for
+ # this test should be 0 throughout the whole test suite so we can just verify they're
+ # 0 after running our query. Omits cached-bytes-read because it has its own test.
+ expect_nonzero_metrics = ["impala-server.io-mgr.bytes-read"]
+ expect_zero_metrics = []
+
+ def append_metric(metric, expect_nonzero):
+ (expect_nonzero_metrics if expect_nonzero else expect_zero_metrics).append(metric)
+
+ append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC)
+ append_metric("impala-server.io-mgr.short-circuit-bytes-read",
+ IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+ # TODO: this should be updated for Ozone, but the code that updates it is guarded by
+ # IsHdfsPath and adding Ozone causes a crash. Plan to debug in IMPALA-11697.
+ append_metric("impala-server.io-mgr.local-bytes-read",
+ IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
+
+ nonzero_before = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+ result = self.execute_query("select count(*) from tpch.nation")
+ assert(len(result.data) == 1)
+ assert(result.data[0] == '25')
+ nation_data_file_length = 2199
+
+ nonzero_after = self.impalad_test_service.get_metric_values(expect_nonzero_metrics)
+
+ zero_values = self.impalad_test_service.get_metric_values(expect_zero_metrics)
+ assert(len(expect_zero_metrics) == len(zero_values))
+ LOG.info("Verifying %s expect-zero metrics.", len(expect_zero_metrics))
+ for metric, value in zip(expect_zero_metrics, zero_values):
+ LOG.info("%s: %s", metric, value)
+ assert(value == 0)
+
+ assert(len(expect_nonzero_metrics) == len(nonzero_before) == len(nonzero_after))
+ LOG.info("Verifying %s expect-non-zero metrics.", len(expect_nonzero_metrics))
+ for metric, before, after in \
+ zip(expect_nonzero_metrics, nonzero_before, nonzero_after):
+ LOG.info("%s: %s -> %s", metric, before, after)
+ assert(before + nation_data_file_length == after)