You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:59 UTC
[16/16] incubator-impala git commit: IMPALA-4835 (prep only): create
io subfolder and namespace
IMPALA-4835 (prep only): create io subfolder and namespace
Instead of using the DiskIoMgr class as a namespace, which prevents
forward-declaration of inner classes, create an impala::io namespace
and unnested the inner class.
This is done in anticipation of DiskIoMgr depending on BufferPool. This
helps avoid a circular dependency between DiskIoMgr, TmpFileMgr and
BufferPool headers that could not be broken with forward declarations.
Testing:
Ran core tests.
Change-Id: If807f93a47d8027a43e56dd80b1b535d0bb74e1b
Reviewed-on: http://gerrit.cloudera.org:8080/8424
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b840137c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b840137c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b840137c
Branch: refs/heads/master
Commit: b840137c940d71af5cec2daf482b523a38b6a9f1
Parents: 2510fe0
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 30 16:34:47 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 17 22:47:34 2017 +0000
----------------------------------------------------------------------
be/CMakeLists.txt | 2 +
be/src/exec/base-sequence-scanner.cc | 9 +-
be/src/exec/hdfs-parquet-scanner.cc | 36 +-
be/src/exec/hdfs-parquet-scanner.h | 4 +-
be/src/exec/hdfs-scan-node-base.cc | 25 +-
be/src/exec/hdfs-scan-node-base.h | 26 +-
be/src/exec/hdfs-scan-node-mt.h | 2 +-
be/src/exec/hdfs-scan-node.cc | 7 +-
be/src/exec/hdfs-scan-node.h | 6 +-
be/src/exec/hdfs-text-scanner.cc | 9 +-
be/src/exec/kudu-scan-node.cc | 2 +-
be/src/exec/scanner-context.cc | 12 +-
be/src/exec/scanner-context.h | 14 +-
be/src/runtime/CMakeLists.txt | 10 +-
be/src/runtime/disk-io-mgr-handle-cache.h | 196 ---
.../runtime/disk-io-mgr-handle-cache.inline.h | 231 ----
be/src/runtime/disk-io-mgr-internal.h | 76 --
be/src/runtime/disk-io-mgr-reader-context.cc | 292 -----
be/src/runtime/disk-io-mgr-reader-context.h | 406 ------
be/src/runtime/disk-io-mgr-scan-range.cc | 591 ---------
be/src/runtime/disk-io-mgr-stress-test.cc | 60 -
be/src/runtime/disk-io-mgr-stress.cc | 246 ----
be/src/runtime/disk-io-mgr-stress.h | 94 --
be/src/runtime/disk-io-mgr-test.cc | 1127 -----------------
be/src/runtime/disk-io-mgr.cc | 1190 -----------------
be/src/runtime/disk-io-mgr.h | 972 --------------
be/src/runtime/exec-env.cc | 4 +-
be/src/runtime/exec-env.h | 9 +-
be/src/runtime/io/CMakeLists.txt | 36 +
be/src/runtime/io/disk-io-mgr-internal.h | 78 ++
be/src/runtime/io/disk-io-mgr-stress-test.cc | 61 +
be/src/runtime/io/disk-io-mgr-stress.cc | 247 ++++
be/src/runtime/io/disk-io-mgr-stress.h | 95 ++
be/src/runtime/io/disk-io-mgr-test.cc | 1129 +++++++++++++++++
be/src/runtime/io/disk-io-mgr.cc | 1191 ++++++++++++++++++
be/src/runtime/io/disk-io-mgr.h | 550 ++++++++
be/src/runtime/io/handle-cache.h | 197 +++
be/src/runtime/io/handle-cache.inline.h | 232 ++++
be/src/runtime/io/request-context.cc | 293 +++++
be/src/runtime/io/request-context.h | 403 ++++++
be/src/runtime/io/request-ranges.h | 471 +++++++
be/src/runtime/io/scan-range.cc | 593 +++++++++
be/src/runtime/row-batch.h | 2 +-
be/src/runtime/runtime-state.cc | 2 +-
be/src/runtime/runtime-state.h | 7 +-
be/src/runtime/test-env.h | 2 +-
be/src/runtime/tmp-file-mgr-test.cc | 10 +-
be/src/runtime/tmp-file-mgr.cc | 20 +-
be/src/runtime/tmp-file-mgr.h | 20 +-
49 files changed, 5702 insertions(+), 5595 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index bf7aa26..163567a 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -361,6 +361,7 @@ set (IMPALA_LINK_LIBS
GlobalFlags
histogram_proto
ImpalaThrift
+ Io
kudu_util
krpc
Rpc
@@ -386,6 +387,7 @@ set (IMPALA_LINK_LIBS
if (BUILD_SHARED_LIBS)
set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
BufferPool
+ Io
Runtime
Exec
CodeGen
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index fcf58c6..7f20e31 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -32,6 +32,7 @@
#include "common/names.h"
using namespace impala;
+using namespace impala::io;
const int BaseSequenceScanner::HEADER_SIZE = 1024;
const int BaseSequenceScanner::SYNC_MARKER = -1;
@@ -48,7 +49,7 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
// Issue just the header range for each file. When the header is complete,
// we'll issue the splits for that file. Splits cannot be processed until the
// header is parsed (the header object is then shared across splits for that file).
- vector<DiskIoMgr::ScanRange*> header_ranges;
+ vector<ScanRange*> header_ranges;
for (int i = 0; i < files.size(); ++i) {
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
@@ -57,9 +58,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
// it is not cached.
// TODO: add remote disk id and plumb that through to the io mgr. It should have
// 1 queue for each NIC as well?
- DiskIoMgr::ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
+ ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1, false,
- DiskIoMgr::BufferOpts::Uncached());
+ BufferOpts::Uncached());
header_ranges.push_back(header_range);
}
// Issue the header ranges only. GetNextInternal() will issue the files' scan ranges
@@ -310,7 +311,7 @@ void BaseSequenceScanner::CloseFileRanges(const char* filename) {
DCHECK(only_parsing_header_);
HdfsFileDesc* desc = scan_node_->GetFileDesc(
context_->partition_descriptor()->id(), filename);
- const vector<DiskIoMgr::ScanRange*>& splits = desc->splits;
+ const vector<ScanRange*>& splits = desc->splits;
for (int i = 0; i < splits.size(); ++i) {
COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
scan_node_->RangeComplete(file_format(), THdfsCompression::NONE);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 7fae959..f407877 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -27,6 +27,7 @@
#include "exec/parquet-column-stats.h"
#include "exec/scanner-context.inline.h"
#include "runtime/collection-value-builder.h"
+#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
#include "runtime/runtime-filter.inline.h"
#include "rpc/thrift-util.h"
@@ -35,6 +36,7 @@
using std::move;
using namespace impala;
+using namespace impala::io;
DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
"rows rejected by a runtime filter drops below this value, the filter is disabled.");
@@ -67,7 +69,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files) {
- vector<DiskIoMgr::ScanRange*> footer_ranges;
+ vector<ScanRange*> footer_ranges;
for (int i = 0; i < files.size(); ++i) {
// If the file size is less than 12 bytes, it is an invalid Parquet file.
if (files[i]->file_length < 12) {
@@ -80,10 +82,10 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
DCHECK_GE(footer_start, 0);
// Try to find the split with the footer.
- DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]);
+ ScanRange* footer_split = FindFooterSplit(files[i]);
for (int j = 0; j < files[i]->splits.size(); ++j) {
- DiskIoMgr::ScanRange* split = files[i]->splits[j];
+ ScanRange* split = files[i]->splits[j];
DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
// If there are no materialized slots (such as count(*) over the table), we can
@@ -98,19 +100,19 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
// is done here, followed by scan ranges for the columns of each row group within
// the actual split (in InitColumns()). The original split is stored in the
// metadata associated with the footer range.
- DiskIoMgr::ScanRange* footer_range;
+ ScanRange* footer_range;
if (footer_split != NULL) {
footer_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), footer_size, footer_start,
split_metadata->partition_id, footer_split->disk_id(),
footer_split->expected_local(),
- DiskIoMgr::BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
+ BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
} else {
// If we did not find the last split, we know it is going to be a remote read.
footer_range =
scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
footer_size, footer_start, split_metadata->partition_id, -1, false,
- DiskIoMgr::BufferOpts::Uncached(), split);
+ BufferOpts::Uncached(), split);
}
footer_ranges.push_back(footer_range);
@@ -125,10 +127,10 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
return Status::OK();
}
-DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
+ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
DCHECK(file != NULL);
for (int i = 0; i < file->splits.size(); ++i) {
- DiskIoMgr::ScanRange* split = file->splits[i];
+ ScanRange* split = file->splits[i];
if (split->offset() + split->len() == file->file_length) return split;
}
return NULL;
@@ -341,7 +343,7 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
// Returns true if 'row_group' overlaps with 'split_range'.
static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
- const DiskIoMgr::ScanRange* split_range) {
+ const ScanRange* split_range) {
int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data);
const parquet::ColumnMetaData& last_column =
@@ -598,7 +600,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
}
Status HdfsParquetScanner::NextRowGroup() {
- const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+ const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
metadata_range_->meta_data())->original_split;
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
@@ -1377,12 +1379,12 @@ Status HdfsParquetScanner::ProcessFooter() {
DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
// Read the header into the metadata buffer.
- DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange(
+ ScanRange* metadata_range = scan_node_->AllocateScanRange(
metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
metadata_range_->disk_id(), metadata_range_->expected_local(),
- DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
+ BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
- unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
+ unique_ptr<BufferDescriptor> io_buffer;
RETURN_IF_ERROR(
io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
@@ -1589,7 +1591,7 @@ Status HdfsParquetScanner::InitColumns(
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
// All the scan ranges (one for each column).
- vector<DiskIoMgr::ScanRange*> col_ranges;
+ vector<ScanRange*> col_ranges;
// Used to validate that the number of values in each reader in column_readers_ is the
// same.
int num_values = -1;
@@ -1656,17 +1658,17 @@ Status HdfsParquetScanner::InitColumns(
"filename '$1'", col_chunk.file_path, filename()));
}
- const DiskIoMgr::ScanRange* split_range =
+ const ScanRange* split_range =
static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
// Determine if the column is completely contained within a local split.
bool col_range_local = split_range->expected_local()
&& col_start >= split_range->offset()
&& col_end <= split_range->offset() + split_range->len();
- DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
+ ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
filename(), col_len, col_start, partition_id, split_range->disk_id(),
col_range_local,
- DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime));
+ BufferOpts(split_range->try_cache(), file_desc->mtime));
col_ranges.push_back(col_range);
// Get the stream that will be used for this column
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index e4b6ae7..0eea458 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -442,7 +442,7 @@ class HdfsParquetScanner : public HdfsScanner {
ParquetFileVersion file_version_;
/// Scan range for the metadata.
- const DiskIoMgr::ScanRange* metadata_range_;
+ const io::ScanRange* metadata_range_;
/// Pool to copy dictionary page buffer into. This pool is shared across all the
/// pages in a column chunk.
@@ -585,7 +585,7 @@ class HdfsParquetScanner : public HdfsScanner {
/// Find and return the last split in the file if it is assigned to this scan node.
/// Returns NULL otherwise.
- static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file);
+ static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
/// Process the file footer and parse file_metadata_. This should be called with the
/// last FOOTER_SIZE bytes in context_.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 9149097..62dbd6a 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -32,11 +32,12 @@
#include "codegen/llvm-codegen.h"
#include "common/logging.h"
#include "common/object-pool.h"
-#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
#include "runtime/descriptors.h"
-#include "runtime/disk-io-mgr-reader-context.h"
#include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "util/disk-info.h"
@@ -54,6 +55,7 @@ DECLARE_bool(skip_file_runtime_filtering);
namespace filesystem = boost::filesystem;
using namespace impala;
+using namespace impala::io;
using namespace strings;
const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
@@ -236,7 +238,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
file_desc->splits.push_back(
AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
split.offset, split.partition_id, params.volume_id, expected_local,
- DiskIoMgr::BufferOpts(try_cache, file_desc->mtime)));
+ BufferOpts(try_cache, file_desc->mtime)));
}
// Update server wide metrics for number of scan ranges and ranges that have
@@ -485,10 +487,10 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
return true;
}
-DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- const DiskIoMgr::BufferOpts& buffer_opts,
- const DiskIoMgr::ScanRange* original_split) {
+ const BufferOpts& buffer_opts,
+ const ScanRange* original_split) {
DCHECK_GE(disk_id, -1);
// Require that the scan range is within [0, file_length). While this cannot be used
// to guarantee safety (file_length metadata may be stale), it avoids different
@@ -502,21 +504,20 @@ DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char*
ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
new ScanRangeMetadata(partition_id, original_split));
- DiskIoMgr::ScanRange* range =
- runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
+ ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);
return range;
}
-DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
- bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split) {
+ bool expected_local, int mtime, const ScanRange* original_split) {
return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
- DiskIoMgr::BufferOpts(try_cache, mtime), original_split);
+ BufferOpts(try_cache, mtime), original_split);
}
Status HdfsScanNodeBase::AddDiskIoRanges(
- const vector<DiskIoMgr::ScanRange*>& ranges, int num_files_queued) {
+ const vector<ScanRange*>& ranges, int num_files_queued) {
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
num_unqueued_files_.Add(-num_files_queued);
DCHECK_GE(num_unqueued_files_.Load(), 0);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e6b2154..923b50a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -31,11 +31,11 @@
#include "exec/filter-context.h"
#include "exec/scan-node.h"
#include "runtime/descriptors.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/request-ranges.h"
#include "util/avro-util.h"
+#include "util/container-util.h"
#include "util/progress-updater.h"
#include "util/spinlock.h"
-#include "util/container-util.h"
namespace impala {
@@ -72,7 +72,7 @@ struct HdfsFileDesc {
THdfsCompression::type file_compression;
/// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
- std::vector<DiskIoMgr::ScanRange*> splits;
+ std::vector<io::ScanRange*> splits;
};
/// Struct for additional metadata for scan ranges. This contains the partition id
@@ -84,9 +84,9 @@ struct ScanRangeMetadata {
/// For parquet scan ranges we initially create a request for the file footer for each
/// split; we store a pointer to the actual split so that we can recover its information
/// for the scanner to process.
- const DiskIoMgr::ScanRange* original_split;
+ const io::ScanRange* original_split;
- ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split)
+ ScanRangeMetadata(int64_t partition_id, const io::ScanRange* original_split)
: partition_id(partition_id), original_split(original_split) { }
};
@@ -154,7 +154,7 @@ class HdfsScanNodeBase : public ScanNode {
const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
int skip_header_line_count() const { return skip_header_line_count_; }
- DiskIoRequestContext* reader_context() const { return reader_context_.get(); }
+ io::RequestContext* reader_context() const { return reader_context_.get(); }
bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; }
int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
@@ -204,22 +204,22 @@ class HdfsScanNodeBase : public ScanNode {
/// If not NULL, the 'original_split' pointer is stored for reference in the scan range
/// metadata of the scan range that is to be allocated.
/// This is thread safe.
- DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+ io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
- const DiskIoMgr::BufferOpts& buffer_opts,
- const DiskIoMgr::ScanRange* original_split = NULL);
+ const io::BufferOpts& buffer_opts,
+ const io::ScanRange* original_split = NULL);
/// Old API for compatibility with text scanners (e.g. LZO text scanner).
- DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+ io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
- bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split = NULL);
+ bool expected_local, int mtime, const io::ScanRange* original_split = NULL);
/// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan
/// ranges have been added completely. A file's scan ranges are added completely if no
/// new scanner threads will be needed to process that file besides the additional
/// threads needed to process those in 'ranges'.
/// Can be overridden to add scan-node specific actions like starting scanner threads.
- virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+ virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
int num_files_queued) WARN_UNUSED_RESULT;
/// Adds all splits for file_desc to the io mgr queue and indicates one file has
@@ -336,7 +336,7 @@ class HdfsScanNodeBase : public ScanNode {
const int parquet_count_star_slot_offset_;
/// RequestContext object to use with the disk-io-mgr for reads.
- std::unique_ptr<DiskIoRequestContext> reader_context_;
+ std::unique_ptr<io::RequestContext> reader_context_;
/// Descriptor for tuples this scan node constructs
const TupleDescriptor* tuple_desc_ = nullptr;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 4ce12fe..3502b18 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -50,7 +50,7 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
private:
/// Current scan range and corresponding scanner.
- DiskIoMgr::ScanRange* scan_range_;
+ io::ScanRange* scan_range_;
boost::scoped_ptr<ScannerContext> scanner_ctx_;
boost::scoped_ptr<HdfsScanner> scanner_;
};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 78f2ffa..2d58c05 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -43,6 +43,7 @@ DECLARE_bool(skip_file_runtime_filtering);
#endif
using namespace impala;
+using namespace impala::io;
// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
// The memory used does not vary considerably between file formats (just a couple of MBs).
@@ -251,7 +252,7 @@ void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
materialized_row_batches_->AddBatch(move(row_batch));
}
-Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
+Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
int num_files_queued) {
RETURN_IF_ERROR(
runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
@@ -420,7 +421,7 @@ void HdfsScanNode::ScannerThread() {
// to return if there's an error.
ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
- DiskIoMgr::ScanRange* scan_range;
+ ScanRange* scan_range;
// Take a snapshot of num_unqueued_files_ before calling GetNextRange().
// We don't want num_unqueued_files_ to go to zero between the return from
// GetNextRange() and the check for when all ranges are complete.
@@ -480,7 +481,7 @@ exit:
}
Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) {
+ MemPool* expr_results_pool, ScanRange* scan_range) {
DCHECK(scan_range != NULL);
ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 30435c2..a1c97cf 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -29,7 +29,7 @@
#include "exec/filter-context.h"
#include "exec/hdfs-scan-node-base.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
#include "util/counting-barrier.h"
#include "util/thread.h"
@@ -79,7 +79,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
bool done() const { return done_; }
/// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
- virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+ virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
int num_files_queued) WARN_UNUSED_RESULT;
/// Adds a materialized row batch for the scan node. This is called from scanner
@@ -166,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
/// in this split.
Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
+ MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT;
/// Returns true if there is enough memory (against the mem tracker limits) to
/// have a scanner thread.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index d633734..487c6fc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -40,6 +40,7 @@
using boost::algorithm::ends_with;
using boost::algorithm::to_lower;
using namespace impala;
+using namespace impala::io;
using namespace strings;
const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
@@ -74,7 +75,7 @@ HdfsTextScanner::~HdfsTextScanner() {
Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
- vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
+ vector<ScanRange*> compressed_text_scan_ranges;
int compressed_text_files = 0;
vector<HdfsFileDesc*> lzo_text_files;
for (int i = 0; i < files.size(); ++i) {
@@ -95,7 +96,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
// In order to decompress gzip-, snappy- and bzip2-compressed text files, we
// need to read entire files. Only read a file if we're assigned the first split
// to avoid reading multi-block files with multiple scanners.
- DiskIoMgr::ScanRange* split = files[i]->splits[j];
+ ScanRange* split = files[i]->splits[j];
// We only process the split that starts at offset 0.
if (split->offset() != 0) {
@@ -114,10 +115,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
DCHECK_GT(files[i]->file_length, 0);
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
- DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
+ ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), files[i]->file_length, 0,
metadata->partition_id, split->disk_id(), split->expected_local(),
- DiskIoMgr::BufferOpts(split->try_cache(), files[i]->mtime));
+ BufferOpts(split->try_cache(), files[i]->mtime));
compressed_text_scan_ranges.push_back(file_range);
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 77fac89..6d5e085 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -52,7 +52,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
// This value is built the same way as it assumes that the scan node runs co-located
// with a Kudu tablet server and that the tablet server is using disks similarly as
// a datanode would.
- max_row_batches = 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS);
+ max_row_batches = 10 * (DiskInfo::num_disks() + io::DiskIoMgr::REMOTE_NUM_DISKS);
}
materialized_row_batches_.reset(new RowBatchQueue(max_row_batches));
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 8cb195d..d9de769 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -21,6 +21,7 @@
#include "exec/hdfs-scan-node-base.h"
#include "exec/hdfs-scan-node.h"
+#include "runtime/io/disk-io-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
@@ -32,6 +33,7 @@
#include "common/names.h"
using namespace impala;
+using namespace impala::io;
using namespace strings;
static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
@@ -43,7 +45,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
- HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
+ HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
: state_(state),
scan_node_(scan_node),
@@ -75,7 +77,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent)
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
-ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
std::unique_ptr<Stream> stream(new Stream(this));
stream->scan_range_ = range;
stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename());
@@ -105,7 +107,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
scan_range_->Cancel(Status::CANCELLED);
}
- for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) {
+ for (unique_ptr<BufferDescriptor>& buffer : completed_io_buffers_) {
ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
}
parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
@@ -164,9 +166,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
return Status::OK();
}
int64_t partition_id = parent_->partition_descriptor()->id();
- DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
+ ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
- scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached());
+ scan_range_->disk_id(), false, BufferOpts::Uncached());
RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
parent_->scan_node_->reader_context(), range, &io_buffer_));
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 216209f..3ad6753 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,7 +27,7 @@
#include "common/compiler-util.h"
#include "common/status.h"
#include "exec/filter-context.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/request-ranges.h"
namespace impala {
@@ -65,7 +65,7 @@ class ScannerContext {
/// get pushed to) and the scan range to process.
/// This context starts with 1 stream.
ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
- DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
+ io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
/// Destructor verifies that all stream objects have been released.
@@ -125,7 +125,7 @@ class ScannerContext {
bool eof() const { return file_offset() == file_len_; }
const char* filename() { return scan_range_->file(); }
- const DiskIoMgr::ScanRange* scan_range() { return scan_range_; }
+ const io::ScanRange* scan_range() { return scan_range_; }
const HdfsFileDesc* file_desc() { return file_desc_; }
/// Returns the buffer's current offset in the file.
@@ -176,7 +176,7 @@ class ScannerContext {
private:
friend class ScannerContext;
ScannerContext* parent_;
- DiskIoMgr::ScanRange* scan_range_;
+ io::ScanRange* scan_range_;
const HdfsFileDesc* file_desc_;
/// Total number of bytes returned from GetBytes()
@@ -195,7 +195,7 @@ class ScannerContext {
int64_t next_read_past_size_bytes_;
/// The current io buffer. This starts as NULL before we've read any bytes.
- std::unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer_;
+ std::unique_ptr<io::BufferDescriptor> io_buffer_;
/// Next byte to read in io_buffer_
uint8_t* io_buffer_pos_;
@@ -227,7 +227,7 @@ class ScannerContext {
/// On the next GetBytes() call, these buffers are released (the caller by calling
/// GetBytes() signals it is done with its previous bytes). At this point the
/// buffers are returned to the I/O manager.
- std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> completed_io_buffers_;
+ std::deque<std::unique_ptr<io::BufferDescriptor>> completed_io_buffers_;
Stream(ScannerContext* parent);
@@ -290,7 +290,7 @@ class ScannerContext {
/// Add a stream to this ScannerContext for 'range'. Returns the added stream.
/// The stream is created in the runtime state's object pool
- Stream* AddStream(DiskIoMgr::ScanRange* range);
+ Stream* AddStream(io::ScanRange* range);
/// Returns false if scan_node_ is multi-threaded and has been cancelled.
/// Always returns false if the scan_node_ is not multi-threaded.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 41805af..0d4b61c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -16,6 +16,7 @@
# under the License.
add_subdirectory(bufferpool)
+add_subdirectory(io)
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
@@ -36,10 +37,6 @@ add_library(Runtime
data-stream-sender.cc
debug-options.cc
descriptors.cc
- disk-io-mgr.cc
- disk-io-mgr-reader-context.cc
- disk-io-mgr-scan-range.cc
- disk-io-mgr-stress.cc
exec-env.cc
fragment-instance-state.cc
hbase-table.cc
@@ -78,16 +75,11 @@ add_library(Runtime
)
add_dependencies(Runtime gen-deps)
-# This test runs forever so should not be part of 'make test'
-add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
-target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
-
ADD_BE_TEST(mem-pool-test)
ADD_BE_TEST(free-pool-test)
ADD_BE_TEST(string-buffer-test)
ADD_BE_TEST(data-stream-test)
ADD_BE_TEST(timestamp-test)
-ADD_BE_TEST(disk-io-mgr-test)
ADD_BE_TEST(raw-value-test)
ADD_BE_TEST(string-compare-test)
ADD_BE_TEST(string-search-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h
deleted file mode 100644
index 4ba2342..0000000
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ /dev/null
@@ -1,196 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
-
-#include <array>
-#include <list>
-#include <map>
-#include <memory>
-
-#include <boost/thread/mutex.hpp>
-
-#include "common/hdfs.h"
-#include "common/status.h"
-#include "util/aligned-new.h"
-#include "util/impalad-metrics.h"
-#include "util/spinlock.h"
-#include "util/thread.h"
-
-namespace impala {
-
-/// This class is a small wrapper around the hdfsFile handle and the file system
-/// instance which is needed to close the file handle. The handle incorporates
-/// the last modified time of the file when it was opened. This is used to distinguish
-/// between file handles for files that can be updated or overwritten.
-class HdfsFileHandle {
- public:
-
- /// Constructor will open the file
- HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
-
- /// Destructor will close the file handle
- ~HdfsFileHandle();
-
- hdfsFile file() const { return hdfs_file_; }
- int64_t mtime() const { return mtime_; }
- bool ok() const { return hdfs_file_ != nullptr; }
-
- private:
- hdfsFS fs_;
- hdfsFile hdfs_file_;
- int64_t mtime_;
-};
-
-/// The FileHandleCache is a data structure that owns HdfsFileHandles to share between
-/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions.
-/// Each partition operates independently with its own locks, reducing contention
-/// between concurrent threads. The `capacity` is split between the partitions and is
-/// enforced independently.
-///
-/// Threads check out a file handle for exclusive access and return it when finished.
-/// If the file handle is not already present in the cache or all file handles for this
-/// file are checked out, the file handle is constructed and added to the cache.
-/// The cache can contain multiple file handles for the same file. If a file handle
-/// is checked out, it cannot be evicted from the cache. In this case, a cache can
-/// exceed the specified capacity.
-///
-/// The file handle cache is currently not suitable for remote files that maintain a
-/// connection as part of the handle. Most remote systems have a limit on the number
-/// of concurrent connections, and file handles in the cache would be counted towards
-/// that limit.
-///
-/// If there is a file handle in the cache and the underlying file is deleted,
-/// the file handle might keep the file from being deleted at the OS level. This can
-/// take up disk space and impact correctness. To avoid this, the cache will evict any
-/// file handle that has been unused for longer than threshold specified by
-/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
-///
-/// TODO: The cache should also evict file handles more aggressively if the file handle's
-/// mtime is older than the file's current mtime.
-template <size_t NUM_PARTITIONS>
-class FileHandleCache {
- public:
- /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
- /// partitions. If the capacity does not split evenly, then the capacity is rounded
- /// up. The cache will age out any file handle that is unused for
- /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero.
- FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
-
- /// Destructor is only called for backend tests
- ~FileHandleCache();
-
- /// Starts up a thread that monitors the age of file handles and evicts any that
- /// exceed the limit.
- Status Init() WARN_UNUSED_RESULT;
-
- /// Get a file handle from the cache for the specified filename (fname) and
- /// last modification time (mtime). This will hash the filename to determine
- /// which partition to use for this file handle.
- ///
- /// If 'require_new_handle' is false and the partition contains an available handle,
- /// the handle is returned and cache_hit is set to true. Otherwise, the partition will
- /// try to construct a file handle and add it to the partition. On success, the new
- /// file handle will be returned with cache_hit set to false. On failure, nullptr will
- /// be returned. In either case, the partition may evict a file handle to make room
- /// for the new file handle.
- ///
- /// This obtains exclusive control over the returned file handle. It must be paired
- /// with a call to ReleaseFileHandle to release exclusive control.
- HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
- bool require_new_handle, bool* cache_hit);
-
- /// Release the exclusive hold on the specified file handle (which was obtained
- /// by calling GetFileHandle). The cache may evict a file handle if the cache is
- /// above capacity. If 'destroy_handle' is true, immediately remove this handle
- /// from the cache.
- void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle);
-
- private:
- struct FileHandleEntry;
- typedef std::multimap<std::string, FileHandleEntry> MapType;
-
- struct LruListEntry {
- LruListEntry(typename MapType::iterator map_entry_in);
- typename MapType::iterator map_entry;
- uint64_t timestamp_seconds;
- };
- typedef std::list<LruListEntry> LruListType;
-
- struct FileHandleEntry {
- FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
- : fh(fh_in), lru_entry(lru_list.end()) {}
- std::unique_ptr<HdfsFileHandle> fh;
-
- /// in_use is true for a file handle checked out via GetFileHandle() that has not
- /// been returned via ReleaseFileHandle().
- bool in_use = false;
-
- /// Iterator to this element's location in the LRU list. This only points to a
- /// valid location when in_use is true. For error-checking, this is set to
- /// lru_list.end() when in_use is false.
- typename LruListType::iterator lru_entry;
- };
-
- /// Each partition operates independently, and thus has its own cache, LRU list,
- /// and corresponding lock. To avoid contention on the lock_ due to false sharing
- /// the partitions are aligned to cache line boundaries.
- struct FileHandleCachePartition : public CacheLineAligned {
- /// Protects access to cache and lru_list.
- SpinLock lock;
-
- /// Multimap from the file name to the file handles for that file. The cache
- /// can contain multiple file handles for the same file and some may have
- /// different mtimes if the file is being modified. All file handles are always
- /// owned by the cache.
- MapType cache;
-
- /// The LRU list only contains file handles that are not in use.
- LruListType lru_list;
-
- /// Maximum number of file handles in cache without evicting unused file handles.
- /// It is not a strict limit, and can be exceeded if all file handles are in use.
- size_t capacity;
-
- /// Current number of file handles in the cache
- size_t size;
- };
-
- /// Periodic check to evict unused file handles. Only executed by eviction_thread_.
- void EvictHandlesLoop();
- static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
-
- /// If the partition is above its capacity, evict the oldest unused file handles to
- /// enforce the capacity.
- void EvictHandles(FileHandleCachePartition& p);
-
- std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
-
- /// Maximum time before an unused file handle is aged out of the cache.
- /// Aging out is disabled if this is set to 0.
- uint64_t unused_handle_timeout_secs_;
-
- /// Thread to check for unused file handles to evict. This thread will exit when
- /// the shut_down_promise_ is set.
- std::unique_ptr<Thread> eviction_thread_;
- Promise<bool> shut_down_promise_;
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
deleted file mode 100644
index 3068971..0000000
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ /dev/null
@@ -1,231 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <tuple>
-
-#include "runtime/disk-io-mgr-handle-cache.h"
-#include "util/hash-util.h"
-#include "util/time.h"
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
-
-namespace impala {
-
-HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname,
- int64_t mtime)
- : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) {
- ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
- VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
-}
-
-HdfsFileHandle::~HdfsFileHandle() {
- if (hdfs_file_ != nullptr && fs_ != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
- VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
- hdfsCloseFile(fs_, hdfs_file_);
- }
- fs_ = nullptr;
- hdfs_file_ = nullptr;
-}
-
-template <size_t NUM_PARTITIONS>
- FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
- uint64_t unused_handle_timeout_secs)
- : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
- DCHECK_GT(NUM_PARTITIONS, 0);
- size_t remainder = capacity % NUM_PARTITIONS;
- size_t base_capacity = capacity / NUM_PARTITIONS;
- size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
- for (FileHandleCachePartition& p : cache_partitions_) {
- p.size = 0;
- p.capacity = partition_capacity;
- }
-}
-
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
- typename MapType::iterator map_entry_in)
- : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
-
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
- shut_down_promise_.Set(true);
- if (eviction_thread_ != nullptr) eviction_thread_->Join();
-}
-
-template <size_t NUM_PARTITIONS>
-Status FileHandleCache<NUM_PARTITIONS>::Init() {
- return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
- &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_);
-}
-
-template <size_t NUM_PARTITIONS>
-HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
- const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
- bool* cache_hit) {
- // Hash the key and get appropriate partition
- int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
- FileHandleCachePartition& p = cache_partitions_[index];
- boost::lock_guard<SpinLock> g(p.lock);
- pair<typename MapType::iterator, typename MapType::iterator> range =
- p.cache.equal_range(*fname);
-
- // If this requires a new handle, skip to the creation codepath. Otherwise,
- // find an unused entry with the same mtime
- FileHandleEntry* ret_elem = nullptr;
- if (!require_new_handle) {
- while (range.first != range.second) {
- FileHandleEntry* elem = &range.first->second;
- if (!elem->in_use && elem->fh->mtime() == mtime) {
- // This element is currently in the lru_list, which means that lru_entry must
- // be an iterator pointing into the lru_list.
- DCHECK(elem->lru_entry != p.lru_list.end());
- // Remove the element from the lru_list and designate that it is not on
- // the lru_list by resetting its iterator to point to the end of the list.
- p.lru_list.erase(elem->lru_entry);
- elem->lru_entry = p.lru_list.end();
- ret_elem = elem;
- *cache_hit = true;
- break;
- }
- ++range.first;
- }
- }
-
- // There was no entry that was free or caller asked for a new handle
- if (!ret_elem) {
- *cache_hit = false;
- // Create a new entry and move it into the map
- HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
- if (!new_fh->ok()) {
- delete new_fh;
- return nullptr;
- }
- FileHandleEntry entry(new_fh, p.lru_list);
- typename MapType::iterator new_it = p.cache.emplace_hint(range.second,
- *fname, std::move(entry));
- ret_elem = &new_it->second;
- ++p.size;
- if (p.size > p.capacity) EvictHandles(p);
- }
-
- DCHECK(ret_elem->fh.get() != nullptr);
- DCHECK(!ret_elem->in_use);
- ret_elem->in_use = true;
- ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
- return ret_elem->fh.get();
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
- HdfsFileHandle* fh, bool destroy_handle) {
- DCHECK(fh != nullptr);
- // Hash the key and get appropriate partition
- int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
- FileHandleCachePartition& p = cache_partitions_[index];
- boost::lock_guard<SpinLock> g(p.lock);
- pair<typename MapType::iterator, typename MapType::iterator> range =
- p.cache.equal_range(*fname);
-
- // TODO: This can be optimized by maintaining some state in the file handle about
- // its location in the map.
- typename MapType::iterator release_it = range.first;
- while (release_it != range.second) {
- FileHandleEntry* elem = &release_it->second;
- if (elem->fh.get() == fh) break;
- ++release_it;
- }
- DCHECK(release_it != range.second);
-
- // This file handle is no longer referenced
- FileHandleEntry* release_elem = &release_it->second;
- DCHECK(release_elem->in_use);
- release_elem->in_use = false;
- ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
- if (destroy_handle) {
- --p.size;
- p.cache.erase(release_it);
- return;
- }
- // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces
- // this buffering so that the file handle takes up less memory when in the cache.
- // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero
- // return code, and we close the file handle and remove it from the cache.
- if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
- // This FileHandleEntry must not be in the lru list already, because it was
- // in use. Verify this by checking that the lru_entry is pointing to the end,
- // which cannot be true for any element in the lru list.
- DCHECK(release_elem->lru_entry == p.lru_list.end());
- // Add this to the lru list, establishing links in both directions.
- // The FileHandleEntry has an iterator to the LruListEntry and the
- // LruListEntry has an iterator to the location of the FileHandleEntry in
- // the cache.
- release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
- if (p.size > p.capacity) EvictHandles(p);
- } else {
- VLOG_FILE << "FS does not support file handle unbuffering, closing file="
- << fname;
- --p.size;
- p.cache.erase(release_it);
- }
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
- while (true) {
- for (FileHandleCachePartition& p : cache_partitions_) {
- boost::lock_guard<SpinLock> g(p.lock);
- EvictHandles(p);
- }
- // This Get() will time out until shutdown, when the promise is set.
- bool timed_out;
- shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
- if (!timed_out) break;
- }
- // The promise must be set to true.
- DCHECK(shut_down_promise_.IsSet());
- DCHECK(shut_down_promise_.Get());
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
- FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
- uint64_t now = MonotonicSeconds();
- uint64_t oldest_allowed_timestamp =
- now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0;
- while (p.lru_list.size() > 0) {
- // Peek at the oldest element
- LruListEntry oldest_entry = p.lru_list.front();
- typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
- uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
- // If the oldest element does not need to be aged out and the cache is not over
- // capacity, then we are done and there is nothing to evict.
- if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
- oldest_entry_timestamp >= oldest_allowed_timestamp)) {
- return;
- }
- // Evict the oldest element
- DCHECK(!oldest_entry_map_it->second.in_use);
- p.cache.erase(oldest_entry_map_it);
- p.lru_list.pop_front();
- --p.size;
- }
-}
-
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
deleted file mode 100644
index cc50af7..0000000
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
-
-#include <unistd.h>
-#include <queue>
-#include <boost/thread/locks.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "common/logging.h"
-#include "runtime/disk-io-mgr-reader-context.h"
-#include "runtime/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
-#include "util/condition-variable.h"
-#include "util/cpu-info.h"
-#include "util/debug-util.h"
-#include "util/disk-info.h"
-#include "util/filesystem-util.h"
-#include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
-
-/// This file contains internal structures shared between submodules of the IoMgr. Users
-/// of the IoMgr do not need to include this file.
-namespace impala {
-
-/// Per disk state
-struct DiskIoMgr::DiskQueue {
- /// Disk id (0-based)
- int disk_id;
-
- /// Lock that protects access to 'request_contexts' and 'work_available'
- boost::mutex lock;
-
- /// Condition variable to signal the disk threads that there is work to do or the
- /// thread should shut down. A disk thread will be woken up when there is a reader
- /// added to the queue. A reader is only on the queue when it has at least one
- /// scan range that is not blocked on available buffers.
- ConditionVariable work_available;
-
- /// list of all request contexts that have work queued on this disk
- std::list<DiskIoRequestContext*> request_contexts;
-
- /// Enqueue the request context to the disk queue. The DiskQueue lock must not be taken.
- inline void EnqueueContext(DiskIoRequestContext* worker) {
- {
- boost::unique_lock<boost::mutex> disk_lock(lock);
- /// Check that the reader is not already on the queue
- DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) ==
- request_contexts.end());
- request_contexts.push_back(worker);
- }
- work_available.NotifyAll();
- }
-
- DiskQueue(int id) : disk_id(id) {}
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
deleted file mode 100644
index d62545b..0000000
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr-internal.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-void DiskIoRequestContext::Cancel(const Status& status) {
- DCHECK(!status.ok());
-
- // Callbacks are collected in this vector and invoked while no lock is held.
- vector<WriteRange::WriteDoneCallback> write_callbacks;
- {
- lock_guard<mutex> lock(lock_);
- DCHECK(Validate()) << endl << DebugString();
-
- // Already being cancelled
- if (state_ == DiskIoRequestContext::Cancelled) return;
-
- DCHECK(status_.ok());
- status_ = status;
-
- // The reader will be put into a cancelled state until call cleanup is complete.
- state_ = DiskIoRequestContext::Cancelled;
-
- // Cancel all scan ranges for this reader. Each range could be one one of
- // four queues.
- for (int i = 0; i < disk_states_.size(); ++i) {
- DiskIoRequestContext::PerDiskState& state = disk_states_[i];
- RequestRange* range = NULL;
- while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
- if (range->request_type() == RequestType::READ) {
- static_cast<ScanRange*>(range)->Cancel(status);
- } else {
- DCHECK(range->request_type() == RequestType::WRITE);
- write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
- }
- }
-
- ScanRange* scan_range;
- while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
- scan_range->Cancel(status);
- }
- WriteRange* write_range;
- while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) {
- write_callbacks.push_back(write_range->callback_);
- }
- }
-
- ScanRange* range = NULL;
- while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
- range->Cancel(status);
- }
- while ((range = blocked_ranges_.Dequeue()) != NULL) {
- range->Cancel(status);
- }
- while ((range = cached_ranges_.Dequeue()) != NULL) {
- range->Cancel(status);
- }
-
- // Schedule reader on all disks. The disks will notice it is cancelled and do any
- // required cleanup
- for (int i = 0; i < disk_states_.size(); ++i) {
- DiskIoRequestContext::PerDiskState& state = disk_states_[i];
- state.ScheduleContext(this, i);
- }
- }
-
- for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
- write_callback(status_);
- }
-
- // Signal reader and unblock the GetNext/Read thread. That read will fail with
- // a cancelled status.
- ready_to_start_ranges_cv_.NotifyAll();
-}
-
-void DiskIoRequestContext::CancelAndMarkInactive() {
- Cancel(Status::CANCELLED);
-
- boost::unique_lock<boost::mutex> l(lock_);
- DCHECK_NE(state_, Inactive);
- DCHECK(Validate()) << endl << DebugString();
-
- // Wait until the ranges finish up.
- while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
-
- // Validate that no buffers were leaked from this context.
- DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
- DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
- DCHECK(Validate()) << endl << DebugString();
- state_ = Inactive;
-}
-
-void DiskIoRequestContext::AddRequestRange(
- DiskIoMgr::RequestRange* range, bool schedule_immediately) {
- // DCHECK(lock_.is_locked()); // TODO: boost should have this API
- DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
- if (state.done()) {
- DCHECK_EQ(state.num_remaining_ranges(), 0);
- state.set_done(false);
- ++num_disks_with_ranges_;
- }
-
- bool schedule_context;
- if (range->request_type() == RequestType::READ) {
- DiskIoMgr::ScanRange* scan_range = static_cast<DiskIoMgr::ScanRange*>(range);
- if (schedule_immediately) {
- ScheduleScanRange(scan_range);
- } else {
- state.unstarted_scan_ranges()->Enqueue(scan_range);
- num_unstarted_scan_ranges_.Add(1);
- }
- // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will
- // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
- // invoked.
- schedule_context = state.next_scan_range_to_start() == NULL;
- } else {
- DCHECK(range->request_type() == RequestType::WRITE);
- DCHECK(!schedule_immediately);
- DiskIoMgr::WriteRange* write_range = static_cast<DiskIoMgr::WriteRange*>(range);
- state.unstarted_write_ranges()->Enqueue(write_range);
-
- // ScheduleContext() has no effect if the context is already scheduled,
- // so this is safe.
- schedule_context = true;
- }
-
- if (schedule_context) state.ScheduleContext(this, range->disk_id());
- ++state.num_remaining_ranges();
-}
-
-DiskIoRequestContext::DiskIoRequestContext(
- DiskIoMgr* parent, int num_disks, MemTracker* tracker)
- : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
-
-// Dumps out request context information. Lock should be taken by caller
-string DiskIoRequestContext::DebugString() const {
- stringstream ss;
- ss << endl << " DiskIoRequestContext: " << (void*)this << " (state=";
- if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
- if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
- if (state_ == DiskIoRequestContext::Active) ss << "Active";
- if (state_ != DiskIoRequestContext::Inactive) {
- ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
- << " #ready_buffers=" << num_ready_buffers_.Load()
- << " #used_buffers=" << num_used_buffers_.Load()
- << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
- << " #finished_scan_ranges=" << num_finished_ranges_.Load()
- << " #disk_with_ranges=" << num_disks_with_ranges_
- << " #disks=" << num_disks_with_ranges_;
- for (int i = 0; i < disk_states_.size(); ++i) {
- ss << endl << " " << i << ": "
- << "is_on_queue=" << disk_states_[i].is_on_queue()
- << " done=" << disk_states_[i].done()
- << " #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges()
- << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
- << " #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size()
- << " #unstarted_write_ranges="
- << disk_states_[i].unstarted_write_ranges()->size()
- << " #reading_threads=" << disk_states_[i].num_threads_in_op();
- }
- }
- ss << ")";
- return ss.str();
-}
-
-bool DiskIoRequestContext::Validate() const {
- if (state_ == DiskIoRequestContext::Inactive) {
- LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
- return false;
- }
-
- if (num_used_buffers_.Load() < 0) {
- LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
- return false;
- }
-
- if (num_ready_buffers_.Load() < 0) {
- LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
- return false;
- }
-
- int total_unstarted_ranges = 0;
- for (int i = 0; i < disk_states_.size(); ++i) {
- const PerDiskState& state = disk_states_[i];
- bool on_queue = state.is_on_queue();
- int num_reading_threads = state.num_threads_in_op();
-
- total_unstarted_ranges += state.unstarted_scan_ranges()->size();
-
- if (num_reading_threads < 0) {
- LOG(WARNING) << "disk_id=" << i
- << "state.num_threads_in_read < 0: #threads="
- << num_reading_threads;
- return false;
- }
-
- if (state_ != DiskIoRequestContext::Cancelled) {
- if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
- state.num_remaining_ranges()) {
- LOG(WARNING) << "disk_id=" << i
- << " state.unstarted_ranges.size() + state.in_flight_ranges.size()"
- << " > state.num_remaining_ranges:"
- << " #unscheduled=" << state.unstarted_scan_ranges()->size()
- << " #in_flight=" << state.in_flight_ranges()->size()
- << " #remaining=" << state.num_remaining_ranges();
- return false;
- }
-
- // If we have an in_flight range, the reader must be on the queue or have a
- // thread actively reading for it.
- if (!state.in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) {
- LOG(WARNING) << "disk_id=" << i
- << " reader has inflight ranges but is not on the disk queue."
- << " #in_flight_ranges=" << state.in_flight_ranges()->size()
- << " #reading_threads=" << num_reading_threads
- << " on_queue=" << on_queue;
- return false;
- }
-
- if (state.done() && num_reading_threads > 0) {
- LOG(WARNING) << "disk_id=" << i
- << " state set to done but there are still threads working."
- << " #reading_threads=" << num_reading_threads;
- return false;
- }
- } else {
- // Is Cancelled
- if (!state.in_flight_ranges()->empty()) {
- LOG(WARNING) << "disk_id=" << i
- << "Reader cancelled but has in flight ranges.";
- return false;
- }
- if (!state.unstarted_scan_ranges()->empty()) {
- LOG(WARNING) << "disk_id=" << i
- << "Reader cancelled but has unstarted ranges.";
- return false;
- }
- }
-
- if (state.done() && on_queue) {
- LOG(WARNING) << "disk_id=" << i
- << " state set to done but the reader is still on the disk queue."
- << " state.done=true and state.is_on_queue=true";
- return false;
- }
- }
-
- if (state_ != DiskIoRequestContext::Cancelled) {
- if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
- LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
- << " sum_in_states=" << num_unstarted_scan_ranges_.Load();
- return false;
- }
- } else {
- if (!ready_to_start_ranges_.empty()) {
- LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
- return false;
- }
- if (!blocked_ranges_.empty()) {
- LOG(WARNING) << "Reader cancelled but has blocked ranges.";
- return false;
- }
- }
-
- return true;
-}
-
-void DiskIoRequestContext::PerDiskState::ScheduleContext(
- DiskIoRequestContext* context, int disk_id) {
- if (!is_on_queue_ && !done_) {
- is_on_queue_ = true;
- context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
- }
-}