You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:42:03 UTC
[12/15] impala git commit: IMPALA-4835: switch I/O buffers to buffer
pool
http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 0ede5b5..8933fec 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#include "common/global-flags.h"
#include "runtime/io/disk-io-mgr.h"
+
+#include "common/global-flags.h"
+#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/handle-cache.inline.h"
#include "runtime/io/error-converter.h"
@@ -53,6 +55,8 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
static const int THREADS_PER_ROTATIONAL_DISK = 1;
static const int THREADS_PER_SOLID_STATE_DISK = 8;
+const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE;
+
// The maximum number of the threads per rotational disk is also the max queue depth per
// rotational disk.
static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of "
@@ -123,13 +127,6 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds,
DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used by the "
"file handle cache.");
-// The IoMgr is able to run with a wide range of memory usage. If a query has memory
-// remaining less than this value, the IoMgr will stop all buffering regardless of the
-// current queue size.
-static const int LOW_MEMORY = 64 * 1024 * 1024;
-
-const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
-
AtomicInt32 DiskIoMgr::next_disk_id_;
namespace detail {
@@ -156,34 +153,6 @@ string DiskIoMgr::DebugString() {
return ss.str();
}
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
- RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
- int64_t buffer_len, MemTracker* mem_tracker)
- : io_mgr_(io_mgr),
- reader_(reader),
- mem_tracker_(mem_tracker),
- scan_range_(scan_range),
- buffer_(buffer),
- buffer_len_(buffer_len) {
- DCHECK(io_mgr != nullptr);
- DCHECK(scan_range != nullptr);
- DCHECK(buffer != nullptr);
- DCHECK_GE(buffer_len, 0);
- DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER,
- mem_tracker == nullptr);
-}
-
-void BufferDescriptor::TransferOwnership(MemTracker* dst) {
- DCHECK(dst != nullptr);
- DCHECK(!is_client_buffer());
- // Memory of cached buffers is not tracked against a tracker.
- if (is_cached()) return;
- DCHECK(mem_tracker_ != nullptr);
- dst->Consume(buffer_len_);
- mem_tracker_->Release(buffer_len_);
- mem_tracker_ = dst;
-}
-
WriteRange::WriteRange(
const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
: RequestRange(RequestType::WRITE), callback_(callback) {
@@ -224,8 +193,8 @@ DiskIoMgr::DiskIoMgr() :
num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
THREADS_PER_SOLID_STATE_DISK)),
- max_buffer_size_(FLAGS_read_size),
- min_buffer_size_(FLAGS_min_buffer_size),
+ max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)),
+ min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)),
shut_down_(false),
total_bytes_read_counter_(TUnit::BYTES),
read_timer_(TUnit::TIME_NS),
@@ -234,8 +203,6 @@ DiskIoMgr::DiskIoMgr() :
FLAGS_num_file_handle_cache_partitions,
FLAGS_unused_file_handle_timeout_sec) {
DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
- int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
- free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
int num_local_disks = DiskInfo::num_disks();
if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
LOG(WARNING) << "Number of disks specified should be between 0 and the number of "
@@ -250,11 +217,11 @@ DiskIoMgr::DiskIoMgr() :
}
DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
- int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) :
+ int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size) :
num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
- max_buffer_size_(max_buffer_size),
- min_buffer_size_(min_buffer_size),
+ max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)),
+ min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)),
shut_down_(false),
total_bytes_read_counter_(TUnit::BYTES),
read_timer_(TUnit::TIME_NS),
@@ -262,8 +229,6 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
FileSystemUtil::MaxNumFileHandles()),
FLAGS_num_file_handle_cache_partitions,
FLAGS_unused_file_handle_timeout_sec) {
- int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
- free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
CheckSseSupport();
@@ -288,37 +253,22 @@ DiskIoMgr::~DiskIoMgr() {
for (int i = 0; i < disk_queues_.size(); ++i) {
if (disk_queues_[i] == nullptr) continue;
int disk_id = disk_queues_[i]->disk_id;
- for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
- it != disk_queues_[i]->request_contexts.end(); ++it) {
- DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
- DCHECK((*it)->disk_states_[disk_id].done());
- (*it)->DecrementDiskRefCount();
+ for (RequestContext* context : disk_queues_[i]->request_contexts) {
+ unique_lock<mutex> context_lock(context->lock_);
+ DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0);
+ DCHECK(context->disk_states_[disk_id].done());
+ context->DecrementDiskRefCount(context_lock);
}
}
- DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
-
- // Delete all allocated buffers
- int num_free_buffers = 0;
- for (int idx = 0; idx < free_buffers_.size(); ++idx) {
- num_free_buffers += free_buffers_[idx].size();
- }
- DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
- GcIoBuffers();
-
for (int i = 0; i < disk_queues_.size(); ++i) {
delete disk_queues_[i];
}
- if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_);
}
-Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
- DCHECK(process_mem_tracker != nullptr);
- free_buffer_mem_tracker_.reset(
- new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-
+Status DiskIoMgr::Init() {
for (int i = 0; i < disk_queues_.size(); ++i) {
disk_queues_[i] = new DiskQueue(i);
int num_threads_per_disk;
@@ -364,101 +314,14 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
return Status::OK();
}
-unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) {
- return unique_ptr<RequestContext>(
- new RequestContext(this, num_total_disks(), mem_tracker));
+unique_ptr<RequestContext> DiskIoMgr::RegisterContext() {
+ return unique_ptr<RequestContext>(new RequestContext(this, num_total_disks()));
}
void DiskIoMgr::UnregisterContext(RequestContext* reader) {
reader->CancelAndMarkInactive();
}
-// Cancellation requires coordination from multiple threads. Each thread that currently
-// has a reference to the request context must notice the cancel and remove it from its
-// tracking structures. The last thread to touch the context should deallocate (aka
-// recycle) the request context object. Potential threads are:
-// 1. Disk threads that are currently reading for this reader.
-// 2. Caller threads that are waiting in GetNext.
-//
-// The steps are:
-// 1. Cancel will immediately set the context in the Cancelled state. This prevents any
-// other thread from adding more ready buffers to the context (they all take a lock and
-// check the state before doing so), or any write ranges to the context.
-// 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking
-// any threads in GetNext(). The reader will see the cancelled Status returned. Cancel
-// also invokes the callback for the WriteRanges with the cancelled state.
-// 3. Disk threads notice the context is cancelled either when picking the next context
-// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled
-// state, removes the context from the disk queue. The last thread per disk with an
-// outstanding reference to the context decrements the number of disk queues the context
-// is on.
-void DiskIoMgr::CancelContext(RequestContext* context) {
- context->Cancel(Status::CANCELLED);
-}
-
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
- r->read_timer_ = c;
-}
-
-void DiskIoMgr::set_open_file_timer(RequestContext* r, RuntimeProfile::Counter* c) {
- r->open_file_timer_ = c;
-}
-
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) {
- r->bytes_read_counter_ = c;
-}
-
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
- RuntimeProfile::Counter* c) {
- r->active_read_thread_counter_ = c;
-}
-
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
- RuntimeProfile::Counter* c) {
- r->disks_accessed_bitmap_ = c;
-}
-
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
- return reader->num_ready_buffers_.Load();
-}
-
-Status DiskIoMgr::context_status(RequestContext* context) const {
- unique_lock<mutex> lock(context->lock_);
- return context->status_;
-}
-
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
- return reader->bytes_read_local_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
- return reader->bytes_read_short_circuit_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
- return reader->bytes_read_dn_cache_.Load();
-}
-
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
- return reader->num_remote_ranges_.Load();
-}
-
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
- return reader->unexpected_remote_bytes_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const {
- return reader->cached_file_handles_hit_count_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const {
- return reader->cached_file_handles_miss_count_.Load();
-}
-
-int64_t DiskIoMgr::GetReadThroughput() {
- return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_);
-}
-
Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
int disk_id = range->disk_id_;
if (disk_id < 0 || disk_id >= disk_queues_.size()) {
@@ -469,84 +332,91 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
return Status(TErrorCode::DISK_IO_ERROR,
Substitute("Invalid scan range. Negative offset $0", range->offset_));
}
- if (range->len_ < 0) {
+ if (range->len_ <= 0) {
return Status(TErrorCode::DISK_IO_ERROR,
- Substitute("Invalid scan range. Negative length $0", range->len_));
+ Substitute("Invalid scan range. Non-positive length $0", range->len_));
}
return Status::OK();
}
-Status DiskIoMgr::AddScanRanges(RequestContext* reader,
- const vector<ScanRange*>& ranges, bool schedule_immediately) {
- if (ranges.empty()) return Status::OK();
-
+Status DiskIoMgr::AddScanRanges(
+ RequestContext* reader, const vector<ScanRange*>& ranges) {
+ DCHECK_GT(ranges.size(), 0);
// Validate and initialize all ranges
for (int i = 0; i < ranges.size(); ++i) {
RETURN_IF_ERROR(ValidateScanRange(ranges[i]));
ranges[i]->InitInternal(this, reader);
}
- // disks that this reader needs to be scheduled on.
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK(reader->Validate()) << endl << reader->DebugString();
- if (reader->state_ == RequestContext::Cancelled) {
- DCHECK(!reader->status_.ok());
- return reader->status_;
- }
+ if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
// Add each range to the queue of the disk the range is on
- for (int i = 0; i < ranges.size(); ++i) {
+ for (ScanRange* range : ranges) {
// Don't add empty ranges.
- DCHECK_NE(ranges[i]->len(), 0);
- ScanRange* range = ranges[i];
-
+ DCHECK_NE(range->len(), 0);
+ reader->AddActiveScanRangeLocked(reader_lock, range);
if (range->try_cache_) {
- if (schedule_immediately) {
- bool cached_read_succeeded;
- RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
- if (cached_read_succeeded) continue;
- // Cached read failed, fall back to AddRequestRange() below.
- } else {
- reader->cached_ranges_.Enqueue(range);
- continue;
- }
+ reader->cached_ranges_.Enqueue(range);
+ } else {
+ reader->AddRangeToDisk(reader_lock, range, ScheduleMode::UPON_GETNEXT);
}
- reader->AddRequestRange(range, schedule_immediately);
}
DCHECK(reader->Validate()) << endl << reader->DebugString();
-
return Status::OK();
}
-Status DiskIoMgr::AddScanRange(
- RequestContext* reader, ScanRange* range, bool schedule_immediately) {
- return AddScanRanges(reader, vector<ScanRange*>({range}), schedule_immediately);
+Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range,
+ bool* needs_buffers) {
+ RETURN_IF_ERROR(ValidateScanRange(range));
+ range->InitInternal(this, reader);
+
+ unique_lock<mutex> reader_lock(reader->lock_);
+ DCHECK(reader->Validate()) << endl << reader->DebugString();
+ if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+
+ DCHECK_NE(range->len(), 0);
+ if (range->try_cache_) {
+ bool cached_read_succeeded;
+ RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
+ if (cached_read_succeeded) {
+ DCHECK(reader->Validate()) << endl << reader->DebugString();
+ *needs_buffers = false;
+ return Status::OK();
+ }
+ // Cached read failed, fall back to normal read path.
+ }
+ // If we don't have a buffer yet, the caller must allocate buffers for the range.
+ *needs_buffers = range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER;
+ if (*needs_buffers) range->SetBlockedOnBuffer();
+ reader->AddActiveScanRangeLocked(reader_lock, range);
+ reader->AddRangeToDisk(reader_lock, range,
+ *needs_buffers ? ScheduleMode::BY_CALLER : ScheduleMode::IMMEDIATELY);
+ DCHECK(reader->Validate()) << endl << reader->DebugString();
+ return Status::OK();
}
// This function returns the next scan range the reader should work on, checking
// for eos and error cases. If there isn't already a cached scan range or a scan
// range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** range,
+ bool* needs_buffers) {
DCHECK(reader != nullptr);
DCHECK(range != nullptr);
*range = nullptr;
- Status status = Status::OK();
+ *needs_buffers = false;
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK(reader->Validate()) << endl << reader->DebugString();
-
while (true) {
- if (reader->state_ == RequestContext::Cancelled) {
- DCHECK(!reader->status_.ok());
- status = reader->status_;
- break;
- }
+ if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) {
// All ranges are done, just return.
- break;
+ return Status::OK();
}
if (!reader->cached_ranges_.empty()) {
@@ -558,7 +428,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
if (cached_read_succeeded) return Status::OK();
// This range ended up not being cached. Loop again and pick up a new range.
- reader->AddRequestRange(*range, false);
+ reader->AddRangeToDisk(reader_lock, *range, ScheduleMode::UPON_GETNEXT);
DCHECK(reader->Validate()) << endl << reader->DebugString();
*range = nullptr;
continue;
@@ -574,183 +444,72 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
// Set this to nullptr, the next time this disk runs for this reader, it will
// get another range ready.
reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
- reader->ScheduleScanRange(*range);
- break;
+ ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag_;
+ if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
+ // We can't schedule this range until the client gives us buffers. The context
+ // must be rescheduled regardless to ensure that 'next_scan_range_to_start' is
+ // refilled.
+ reader->disk_states_[disk_id].ScheduleContext(reader_lock, reader, disk_id);
+ (*range)->SetBlockedOnBuffer();
+ *needs_buffers = true;
+ } else {
+ reader->ScheduleScanRange(reader_lock, *range);
+ }
+ return Status::OK();
}
}
- return status;
}
-Status DiskIoMgr::Read(RequestContext* reader,
- ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
- DCHECK(range != nullptr);
- DCHECK(buffer != nullptr);
- *buffer = nullptr;
-
- if (range->len() > max_buffer_size_
- && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
- return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot "
- "perform sync read of '$0' bytes that is larger than the max read buffer size "
- "'$1'.", range->len(), max_buffer_size_));
- }
-
- vector<ScanRange*> ranges;
- ranges.push_back(range);
- RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
- RETURN_IF_ERROR(range->GetNext(buffer));
- DCHECK((*buffer) != nullptr);
- DCHECK((*buffer)->eosr());
+Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader,
+ BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) {
+ DCHECK_GE(max_bytes, min_buffer_size_);
+ DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+ << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers "
+ << "when already reading into an external buffer";
+ BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
+ Status status;
+ vector<unique_ptr<BufferDescriptor>> buffers;
+ for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
+ BufferPool::BufferHandle handle;
+ status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
+ if (!status.ok()) goto error;
+ buffers.emplace_back(new BufferDescriptor(
+ this, reader, range, bp_client, move(handle)));
+ }
+ range->AddUnusedBuffers(move(buffers), false);
return Status::OK();
+ error:
+ DCHECK(!status.ok());
+ range->CleanUpBuffers(move(buffers));
+ return status;
}
-void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
- DCHECK(buffer_desc != nullptr);
- if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
-
- RequestContext* reader = buffer_desc->reader_;
- if (buffer_desc->buffer_ != nullptr) {
- if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
- // Buffers the were not allocated by DiskIoMgr don't need to be freed.
- FreeBufferMemory(buffer_desc.get());
- }
- buffer_desc->buffer_ = nullptr;
- num_buffers_in_readers_.Add(-1);
- reader->num_buffers_in_reader_.Add(-1);
- } else {
- // A nullptr buffer means there was an error in which case there is no buffer
- // to return.
- }
-
- if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
- // Need to close the scan range if returning the last buffer or the scan range
- // has been cancelled (and the caller might never get the last buffer).
- // Close() is idempotent so multiple cancelled buffers is okay.
- buffer_desc->scan_range_->Close();
- }
-}
-
-unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
- RequestContext* reader, ScanRange* range, int64_t buffer_size) {
- DCHECK_LE(buffer_size, max_buffer_size_);
- DCHECK_GT(buffer_size, 0);
- buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
- int idx = free_buffers_idx(buffer_size);
- // Quantize buffer size to nearest power of 2 greater than the specified buffer size and
- // convert to bytes
- buffer_size = (1LL << idx) * min_buffer_size_;
-
- // Track memory against the reader. This is checked the next time we start
- // a read for the next reader in DiskIoMgr::GetNextScanRange().
- DCHECK(reader->mem_tracker_ != nullptr);
- reader->mem_tracker_->Consume(buffer_size);
-
- uint8_t* buffer = nullptr;
- {
- unique_lock<mutex> lock(free_buffers_lock_);
- if (free_buffers_[idx].empty()) {
- num_allocated_buffers_.Add(1);
- if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
- }
- if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
- ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
- }
- // We already tracked this memory against the reader's MemTracker.
- buffer = new uint8_t[buffer_size];
+vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) {
+ DCHECK_GE(max_bytes, min_buffer_size_);
+ vector<int64_t> buffer_sizes;
+ int64_t bytes_allocated = 0;
+ while (bytes_allocated < scan_range_len) {
+ int64_t bytes_remaining = scan_range_len - bytes_allocated;
+ // Either allocate a max-sized buffer or a smaller buffer to fit the rest of the
+ // range.
+ int64_t next_buffer_size;
+ if (bytes_remaining >= max_buffer_size_) {
+ next_buffer_size = max_buffer_size_;
} else {
- if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
- }
- buffer = free_buffers_[idx].front();
- free_buffers_[idx].pop_front();
- free_buffer_mem_tracker_->Release(buffer_size);
- ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
+ next_buffer_size =
+ max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining));
}
- }
-
- // Validate more invariants.
- DCHECK(range != nullptr);
- DCHECK(reader != nullptr);
- DCHECK(buffer != nullptr);
- return unique_ptr<BufferDescriptor>(new BufferDescriptor(
- this, reader, range, buffer, buffer_size, reader->mem_tracker_));
-}
-
-void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
- unique_lock<mutex> lock(free_buffers_lock_);
- int buffers_freed = 0;
- int bytes_freed = 0;
- // Free small-to-large to avoid retaining many small buffers and fragmenting memory.
- for (int idx = 0; idx < free_buffers_.size(); ++idx) {
- deque<uint8_t*>* free_buffers = &free_buffers_[idx];
- while (
- !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) {
- uint8_t* buffer = free_buffers->front();
- free_buffers->pop_front();
- int64_t buffer_size = (1LL << idx) * min_buffer_size_;
- ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
- delete[] buffer;
- free_buffer_mem_tracker_->Release(buffer_size);
- num_allocated_buffers_.Add(-1);
-
- ++buffers_freed;
- bytes_freed += buffer_size;
- }
- if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
- }
-
- if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
- }
- if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
- ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
- }
- if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
- }
-}
-
-void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
- DCHECK(!desc->is_cached());
- DCHECK(!desc->is_client_buffer());
- uint8_t* buffer = desc->buffer_;
- int64_t buffer_size = desc->buffer_len_;
- int idx = free_buffers_idx(buffer_size);
- DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
- << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
- << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
-
- {
- unique_lock<mutex> lock(free_buffers_lock_);
- if (!FLAGS_disable_mem_pools &&
- free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
- // Poison buffers stored in cache.
- ASAN_POISON_MEMORY_REGION(buffer, buffer_size);
- free_buffers_[idx].push_back(buffer);
- if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
- }
- // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race
- // with a Release() call for the same buffer that could make consumption negative.
- // Note: we can't use TryConsume(), which can indirectly call GcIoBuffers().
- // TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer
- // pool's free lists, and remove these free lists.
- free_buffer_mem_tracker_->Consume(buffer_size);
- } else {
- num_allocated_buffers_.Add(-1);
- delete[] buffer;
- if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
- }
- if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
- ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
- }
+ if (next_buffer_size + bytes_allocated > max_bytes) {
+ // Can't allocate the desired buffer size. Make sure to allocate at least one
+ // buffer.
+ if (bytes_allocated > 0) break;
+ next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes);
}
+ DCHECK(BitUtil::IsPowerOf2(next_buffer_size)) << next_buffer_size;
+ buffer_sizes.push_back(next_buffer_size);
+ bytes_allocated += next_buffer_size;
}
-
- // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr.
- desc->mem_tracker_->Release(buffer_size);
- desc->buffer_ = nullptr;
+ return buffer_sizes;
}
// This function gets the next RequestRange to work on for this disk. It checks for
@@ -790,7 +549,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
disk_queue->request_contexts.pop_front();
DCHECK(*request_context != nullptr);
request_disk_state = &((*request_context)->disk_states_[disk_id]);
- request_disk_state->IncrementRequestThreadAndDequeue();
+ request_disk_state->IncrementDiskThreadAndDequeue();
}
// NOTE: no locks were taken in between. We need to be careful about what state
@@ -798,27 +557,13 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
// There are some invariants here. Only one disk thread can have the
// same reader here (the reader is removed from the queue). There can be
// other disk threads operating on this reader in other functions though.
-
- // We just picked a reader. Before we may allocate a buffer on its behalf, check that
- // it has not exceeded any memory limits (e.g. the query or process limit).
- // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and
- // move these memory limit checks to GetFreeBuffer().
- // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers().
- // TODO: IMPALA-3209: we should not force a reader over its memory limit by
- // pushing more buffers to it. Most readers can make progress and operate within
- // a fixed memory limit.
- if ((*request_context)->mem_tracker_ != nullptr
- && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
- (*request_context)->Cancel(Status::MemLimitExceeded());
- }
-
unique_lock<mutex> request_lock((*request_context)->lock_);
VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
<< (*request_context)->DebugString();
// Check if reader has been cancelled
if ((*request_context)->state_ == RequestContext::Cancelled) {
- request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
+ request_disk_state->DecrementDiskThread(request_lock, *request_context);
continue;
}
@@ -829,16 +574,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
!request_disk_state->unstarted_scan_ranges()->empty()) {
// We don't have a range queued for this disk for what the caller should
// read next. Populate that. We want to have one range waiting to minimize
- // wait time in GetNextRange.
+ // wait time in GetNextUnstartedRange().
ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
(*request_context)->num_unstarted_scan_ranges_.Add(-1);
(*request_context)->ready_to_start_ranges_.Enqueue(new_range);
request_disk_state->set_next_scan_range_to_start(new_range);
if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
- // All the ranges have been started, notify everyone blocked on GetNextRange.
- // Only one of them will get work so make sure to return nullptr to the other
- // caller threads.
+ // All the ranges have been started, notify everyone blocked on
+ // GetNextUnstartedRange(). Only one of them will get work so make sure to return
+ // nullptr to the other caller threads.
(*request_context)->ready_to_start_ranges_cv_.NotifyAll();
} else {
(*request_context)->ready_to_start_ranges_cv_.NotifyOne();
@@ -861,7 +606,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
// There are no inflight ranges, nothing to do.
if (request_disk_state->in_flight_ranges()->empty()) {
- request_disk_state->DecrementRequestThread();
+ request_disk_state->DecrementDiskThread(request_lock, *request_context);
continue;
}
DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
@@ -870,7 +615,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
// Now that we've picked a request range, put the context back on the queue so
// another thread can pick up another request range for this context.
- request_disk_state->ScheduleContext(*request_context, disk_id);
+ request_disk_state->ScheduleContext(request_lock, *request_context, disk_id);
DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
return true;
}
@@ -884,81 +629,62 @@ void DiskIoMgr::HandleWriteFinished(
// Copy disk_id before running callback: the callback may modify write_range.
int disk_id = write_range->disk_id_;
- // Execute the callback before decrementing the thread count. Otherwise CancelContext()
- // that waits for the disk ref count to be 0 will return, creating a race, e.g. see
- // IMPALA-1890.
+ // Execute the callback before decrementing the thread count. Otherwise
+ // RequestContext::Cancel() that waits for the disk ref count to be 0 will
+ // return, creating a race, e.g. see IMPALA-1890.
// The status of the write does not affect the status of the writer context.
write_range->callback_(write_status);
{
unique_lock<mutex> writer_lock(writer->lock_);
DCHECK(writer->Validate()) << endl << writer->DebugString();
RequestContext::PerDiskState& state = writer->disk_states_[disk_id];
- if (writer->state_ == RequestContext::Cancelled) {
- state.DecrementRequestThreadAndCheckDone(writer);
- } else {
- state.DecrementRequestThread();
- }
+ state.DecrementDiskThread(writer_lock, writer);
--state.num_remaining_ranges();
}
}
void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
- unique_ptr<BufferDescriptor> buffer) {
+ Status read_status, unique_ptr<BufferDescriptor> buffer) {
unique_lock<mutex> reader_lock(reader->lock_);
- RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+ RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id];
DCHECK(reader->Validate()) << endl << reader->DebugString();
- DCHECK_GT(state.num_threads_in_op(), 0);
- DCHECK(buffer->buffer_ != nullptr);
-
- if (reader->state_ == RequestContext::Cancelled) {
- state.DecrementRequestThreadAndCheckDone(reader);
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
- buffer->buffer_ = nullptr;
- ScanRange* scan_range = buffer->scan_range_;
- scan_range->Cancel(reader->status_);
- // Enqueue the buffer to use the scan range's buffer cleanup path.
- scan_range->EnqueueBuffer(reader_lock, move(buffer));
- return;
- }
-
- DCHECK_EQ(reader->state_, RequestContext::Active);
+ DCHECK_GT(disk_state->num_threads_in_op(), 0);
DCHECK(buffer->buffer_ != nullptr);
+ DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path.";
- // Update the reader's scan ranges. There are a three cases here:
- // 1. Read error
- // 2. End of scan range
- // 3. Middle of scan range
- if (!buffer->status_.ok()) {
- // Error case
- if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
- buffer->buffer_ = nullptr;
- buffer->eosr_ = true;
- --state.num_remaining_ranges();
- buffer->scan_range_->Cancel(buffer->status_);
- } else if (buffer->eosr_) {
- --state.num_remaining_ranges();
- }
-
- // After calling EnqueueBuffer(), it is no longer valid to read from buffer.
- // Store the state we need before calling EnqueueBuffer().
+ // After calling EnqueueReadyBuffer() below, it is no longer valid to read from buffer.
+ // Store the state we need before calling EnqueueReadyBuffer().
bool eosr = buffer->eosr_;
+
+ // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is
+ // called because all clients of DiskIoMgr keep ScanRange objects alive until they
+ // unregister their RequestContext.
ScanRange* scan_range = buffer->scan_range_;
- bool is_cached = buffer->is_cached();
- bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
- if (eosr) {
- // For cached buffers, we can't close the range until the cached buffer is returned.
- // Close() is called from DiskIoMgr::ReturnBuffer().
- if (!is_cached) scan_range->Close();
+ bool scan_range_done = eosr;
+ if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
+ DCHECK_EQ(reader->state_, RequestContext::Active);
+ // Read successfully - update the reader's scan ranges. There are two cases here:
+ // 1. End of scan range or cancelled scan range - don't need to reschedule.
+ // 2. Middle of scan range - need to schedule to read next buffer.
+ bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer));
+ if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range);
} else {
- if (queue_full) {
- reader->blocked_ranges_.Enqueue(scan_range);
- } else {
- reader->ScheduleScanRange(scan_range);
- }
+ // The scan range will be cancelled, either because we hit an error or because the
+ // request context was cancelled. The buffer is not needed - we must free it.
+ reader->FreeBuffer(buffer.get());
+ // Propagate 'read_status' to the scan range. If we are here because the context
+ // was cancelled, the scan range is already cancelled so we do not need to re-cancel
+ // it.
+ if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, read_status);
+ scan_range_done = true;
+ }
+ if (scan_range_done) {
+ scan_range->Close();
+ --disk_state->num_remaining_ranges();
}
- state.DecrementRequestThread();
+ DCHECK(reader->Validate()) << endl << reader->DebugString();
+ disk_state->DecrementDiskThread(reader_lock, reader);
}
void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
@@ -974,14 +700,12 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
// 3. Perform the read or write as specified.
// Cancellation checking needs to happen in both steps 1 and 3.
while (true) {
- RequestContext* worker_context = nullptr;;
+ RequestContext* worker_context = nullptr;
RequestRange* range = nullptr;
-
if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
DCHECK(shut_down_);
- break;
+ return;
}
-
if (range->request_type() == RequestType::READ) {
ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
} else {
@@ -989,12 +713,8 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
Write(worker_context, static_cast<WriteRange*>(range));
}
}
-
- DCHECK(shut_down_);
}
-// This function reads the specified scan range associated with the
-// specified reader context and disk queue.
void DiskIoMgr::ReadRange(
DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -1002,86 +722,40 @@ void DiskIoMgr::ReadRange(
unique_ptr<BufferDescriptor> buffer_desc;
if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range,
- range->client_buffer_.data, range->client_buffer_.len, nullptr));
+ range->client_buffer_.data, range->client_buffer_.len));
} else {
- // Need to allocate a buffer to read into.
- int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
- buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size);
- if (buffer_desc == nullptr) return;
+ DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+ << "This code path does not handle other buffer types, i.e. HDFS cache"
+ << static_cast<int>(range->external_buffer_tag_);
+ buffer_desc = range->GetNextUnusedBufferForRange();
+ if (buffer_desc == nullptr) {
+ // No buffer available - the range will be rescheduled when a buffer is added.
+ unique_lock<mutex> reader_lock(reader->lock_);
+ reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, reader);
+ DCHECK(reader->Validate()) << endl << reader->DebugString();
+ return;
+ }
}
- reader->num_used_buffers_.Add(1);
// No locks in this section. Only working on local vars. We don't want to hold a
// lock across the read call.
- buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
- if (buffer_desc->status_.ok()) {
+ Status read_status = range->Open(detail::is_file_handle_caching_enabled());
+ if (read_status.ok()) {
// Update counters.
- if (reader->active_read_thread_counter_) {
- reader->active_read_thread_counter_->Add(1L);
- }
- if (reader->disks_accessed_bitmap_) {
- int64_t disk_bit = 1LL << disk_queue->disk_id;
- reader->disks_accessed_bitmap_->BitOr(disk_bit);
- }
+ COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
+ COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id);
- buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+ read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
&buffer_desc->len_, &buffer_desc->eosr_);
buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
- if (reader->bytes_read_counter_ != nullptr) {
- COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
- }
-
+ COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_);
COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
- if (reader->active_read_thread_counter_) {
- reader->active_read_thread_counter_->Add(-1L);
- }
+ COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L);
}
// Finished read, update reader/disk based on the results
- HandleReadFinished(disk_queue, reader, move(buffer_desc));
-}
-
-unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
- DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
- int64_t buffer_size) {
- DCHECK(reader->mem_tracker_ != nullptr);
- bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
- if (!enough_memory) {
- // Low memory, GC all the buffers and try again.
- GcIoBuffers();
- enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
- }
-
- if (!enough_memory) {
- RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
- unique_lock<mutex> reader_lock(reader->lock_);
-
- // Just grabbed the reader lock, check for cancellation.
- if (reader->state_ == RequestContext::Cancelled) {
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- state.DecrementRequestThreadAndCheckDone(reader);
- range->Cancel(reader->status_);
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- return nullptr;
- }
-
- if (!range->ready_buffers_.empty()) {
- // We have memory pressure and this range doesn't need another buffer
- // (it already has one queued). Skip this range and pick it up later.
- range->blocked_on_queue_ = true;
- reader->blocked_ranges_.Enqueue(range);
- state.DecrementRequestThread();
- return nullptr;
- } else {
- // We need to get a buffer anyway since there are none queued. The query
- // is likely to fail due to mem limits but there's nothing we can do about that
- // now.
- }
- }
- unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, buffer_size);
- DCHECK(buffer_desc != nullptr);
- return buffer_desc;
+ HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
}
void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
@@ -1113,30 +787,14 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
#endif
RETURN_IF_ERROR(local_file_system_->Fwrite(file_handle, write_range));
- if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
- ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
- }
-
+ ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
return Status::OK();
}
-int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
- int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
- int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
- DCHECK_GE(idx, 0);
- DCHECK_LT(idx, free_buffers_.size());
- return idx;
-}
-
Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) {
unique_lock<mutex> writer_lock(writer->lock_);
-
- if (writer->state_ == RequestContext::Cancelled) {
- DCHECK(!writer->status_.ok());
- return writer->status_;
- }
-
- writer->AddRequestRange(write_range, false);
+ if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+ writer->AddRangeToDisk(writer_lock, write_range, ScheduleMode::IMMEDIATELY);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 52d6993..17aa211 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -30,6 +30,7 @@
#include "common/hdfs.h"
#include "common/object-pool.h"
#include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/handle-cache.h"
#include "runtime/io/local-file-system.h"
#include "runtime/io/request-ranges.h"
@@ -42,21 +43,19 @@
namespace impala {
-class MemTracker;
-
namespace io {
/// Manager object that schedules IO for all queries on all disks and remote filesystems
/// (such as S3). Each query maps to one or more RequestContext objects, each of which
/// has its own queue of scan ranges and/or write ranges.
-//
+///
/// The API splits up requesting scan/write ranges (non-blocking) and reading the data
/// (blocking). The DiskIoMgr has worker threads that will read from and write to
/// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to
/// keep all disks and all cores as busy as possible.
-//
+///
/// All public APIs are thread-safe. It is not valid to call any of the APIs after
/// UnregisterContext() returns.
-//
+///
/// For Readers:
/// We can model this problem as a multiple producer (threads for each disk), multiple
/// consumer (scan ranges) problem. There are multiple queues that need to be
@@ -68,84 +67,102 @@ namespace io {
/// Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller
/// asks the IoMgr for the next range to process. The IoMgr then selects the best range
/// to read based on disk activity and begins reading and queuing buffers for that range.
-/// TODO: We should map readers to queries. A reader is the unit of scheduling and queries
-/// that have multiple scan nodes shouldn't have more 'turns'.
-//
+///
/// For Writers:
/// Data is written via AddWriteRange(). This is non-blocking and adds a WriteRange to a
/// per-disk queue. After the write is complete, a callback in WriteRange is invoked.
/// No memory is allocated within IoMgr for writes and no copies are made. It is the
/// responsibility of the client to ensure that the data to be written is valid and that
/// the file to be written to exists until the callback is invoked.
-//
-/// The IoMgr provides three key APIs.
-/// 1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges that
-/// will eventually need to be read.
-/// 2. GetNextRange: returns to the caller the next scan range it should process.
-/// This is based on disk load. This also begins reading the data in this scan
-/// range. This is blocking.
-/// 3. ScanRange::GetNext: returns the next buffer for this range. This is blocking.
-//
+///
+/// There are several key methods for scanning data with the IoMgr.
+/// 1. StartScanRange(): adds range to the IoMgr to start immediately.
+/// 2. AddScanRanges(): adds ranges to the IoMgr that the reader wants to scan, but does
+/// not start them until GetNextUnstartedRange() is called.
+/// 3. GetNextUnstartedRange(): returns to the caller the next scan range it should
+/// process.
+/// 4. ScanRange::GetNext(): returns the next buffer for this range, blocking until
+/// data is available.
+///
/// The disk threads do not synchronize with each other. The readers and writers don't
/// synchronize with each other. There is a lock and condition variable for each request
/// context queue and each disk queue.
/// IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock
/// before the disk lock.
-//
+///
/// Scheduling: If there are multiple request contexts with work for a single disk, the
/// request contexts are scheduled in round-robin order. Multiple disk threads can
/// operate on the same request context. Exactly one request range is processed by a
-/// disk thread at a time. If there are multiple scan ranges scheduled via
-/// GetNextRange() for a single context, these are processed in round-robin order.
+/// disk thread at a time. If there are multiple scan ranges scheduled for a single
+/// context, these are processed in round-robin order.
/// If there are multiple scan and write ranges for a disk, a read is always followed
/// by a write, and a write is followed by a read, i.e. reads and writes alternate.
/// If multiple write ranges are enqueued for a single disk, they will be processed
/// by the disk threads in order, but may complete in any order. No guarantees are made
/// on ordering of writes across disks.
-//
-/// Resource Management: effective resource management in the IoMgr is key to good
-/// performance. The IoMgr helps coordinate two resources: CPU and disk. For CPU,
-/// spinning up too many threads causes thrashing.
-/// Memory usage in the IoMgr comes from queued read buffers. If we queue the minimum
-/// (i.e. 1), then the disks are idle while we are processing the buffer. If we don't
-/// limit the queue, then it possible we end up queueing the entire data set (i.e. CPU
-/// is slower than disks) and run out of memory.
-/// For both CPU and memory, we want to model the machine as having a fixed amount of
-/// resources. If a single query is running, it should saturate either CPU or Disk
-/// as well as using as little memory as possible. With multiple queries, each query
-/// should get less CPU. In that case each query will need fewer queued buffers and
-/// therefore have less memory usage.
-//
-/// The IoMgr defers CPU management to the caller. The IoMgr provides a GetNextRange
-/// API which will return the next scan range the caller should process. The caller
-/// can call this from the desired number of reading threads. Once a scan range
-/// has been returned via GetNextRange, the IoMgr will start to buffer reads for
-/// that range and it is expected the caller will pull those buffers promptly. For
-/// example, if the caller would like to have 1 scanner thread, the read loop
-/// would look like:
+///
+/// Resource Management: the IoMgr is designed to share the available disk I/O capacity
+/// between many clients and to help use the available I/O capacity efficiently. The IoMgr
+/// interfaces are designed to let clients manage their own CPU and memory usage while the
+/// IoMgr manages the allocation of the I/O capacity of different I/O devices to scan
+/// ranges of different clients.
+///
+/// IoMgr clients may want to work on multiple scan ranges at a time to maximize CPU and
+/// I/O utilization. Clients can call GetNextUnstartedRange() to start as many concurrent
+/// scan ranges as required, e.g. from each parallel scanner thread. Once a scan range has
+/// been returned via GetNextUnstartedRange(), the caller must allocate any memory needed
+/// for buffering reads, after which the IoMgr wil start to fill the buffers with data
+/// while the caller concurrently consumes and processes the data. For example, the logic
+/// in a scanner thread might look like:
/// while (more_ranges)
-/// range = GetNextRange()
+/// range = GetNextUnstartedRange()
/// while (!range.eosr)
/// buffer = range.GetNext()
-/// To have multiple reading threads, the caller would simply spin up the threads
-/// and each would process the loops above.
-//
-/// To control the number of IO buffers, each scan range has a limit of two queued
-/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity,
-/// the IoMgr will no longer read for that scan range until the caller has processed
-/// a buffer. Assuming the client returns each buffer before requesting the next one
-/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per
-/// scan range.
-//
+///
+/// Note that the IoMgr rather than the client is responsible for choosing which scan
+/// range to process next, which allows optimizations like distributing load across disks.
+///
/// Buffer Management:
-/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller,
-/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided by the
-/// caller when constructing the scan range.
+/// Buffers for reads are either a) allocated on behalf of the caller with
+/// AllocateBuffersForRange() ("IoMgr-allocated"), b) cached HDFS buffers if the scan
+/// range was read from the HDFS cache, or c) a client buffer, large enough to fit the
+/// whole scan range's data, that is provided by the caller when constructing the
+/// scan range.
+///
+/// All three kinds of buffers are wrapped in BufferDescriptors before returning to the
+/// caller. The caller must always call ReturnBuffer() on the buffer descriptor to allow
+/// recycling of the buffer memory and to release any resources associated with the buffer
+/// or scan range.
///
-/// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors
-/// and returned to the caller. The caller must always call ReturnBuffer() on the buffer
-/// descriptor to allow recycling of the associated buffer (if there is an
-/// IoMgr-allocated or HDFS cached buffer).
+/// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to return again if
+/// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, each buffer
+/// must be returned multiple times. Callers must be careful to call ReturnBuffer() with
+/// the previous buffer returned from the range before calling before GetNext() so that
+/// at least one buffer is available for the I/O mgr to read data into. Calling GetNext()
+/// when the scan range has no buffers to read data into causes a resource deadlock.
+/// NB: if the scan range was allocated N buffers, then it's always ok for the caller
+/// to hold onto N - 1 buffers, but currently the IoMgr doesn't give the caller a way
+/// to determine the value of N.
+///
+/// If the caller wants to maximize I/O throughput, it can give the range enough memory
+/// for 3 max-sized buffers per scan range. Having two queued buffers (plus the buffer
+/// that is currently being processed by the client) gives good performance in most
+/// scenarios:
+/// 1. If the consumer is consuming data faster than we can read from disk, then the
+/// queue will be empty most of the time because the buffer will be immediately
+/// pulled off the queue as soon as it is added. There will always be an I/O request
+/// in the disk queue to maximize I/O throughput, which is the bottleneck in this
+/// case.
+/// 2. If we can read from disk faster than the consumer is consuming data, the queue
+/// will fill up and there will always be a buffer available for the consumer to
+/// read, so the consumer will not block and we maximize consumer throughput, which
+/// is the bottleneck in this case.
+/// 3. If the consumer is consuming data at approximately the same rate as we are
+/// reading from disk, then the steady state is that the consumer is processing one
+/// buffer and one buffer is in the disk queue. The additional buffer can absorb
+/// bursts where the producer runs faster than the consumer or the consumer runs
+/// faster than the producer without blocking either the producer or consumer.
+/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE.
///
/// Caching support:
/// Scan ranges contain metadata on whether or not it is cached on the DN. In that
@@ -161,13 +178,13 @@ namespace io {
/// - HDFS will time us out if we hold onto the mlock for too long
/// - Holding the lock prevents uncaching this file due to a caching policy change.
/// Therefore, we only issue the cached read when the caller is ready to process the
-/// range (GetNextRange()) instead of when the ranges are issued. This guarantees that
-/// there will be a CPU available to process the buffer and any throttling we do with
+/// range (GetNextUnstartedRange()) instead of when the ranges are issued. This guarantees
+/// that there will be a CPU available to process the buffer and any throttling we do with
/// the number of scanner threads properly controls the amount of files we mlock.
/// With cached scan ranges, we cannot close the scan range until the cached buffer
/// is returned (HDFS does not allow this). We therefore need to defer the close until
/// the cached buffer is returned (ReturnBuffer()).
-//
+///
/// Remote filesystem support (e.g. S3):
/// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk
/// queue for each supported remote filesystem type. In order to maximize throughput,
@@ -176,12 +193,13 @@ namespace io {
/// intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can
/// be CPU bottlenecked especially if not enough I/O threads for these queues are
/// started.
-//
+///
+/// TODO: We should implement more sophisticated resource management. Currently readers
+/// are the unit of scheduling and we attempt to distribute IOPS between them. Instead
+/// it would be better to have policies based on queries, resource pools, etc.
/// TODO: IoMgr should be able to request additional scan ranges from the coordinator
/// to help deal with stragglers.
-/// TODO: look into using a lock free queue
-/// TODO: simplify the common path (less locking, memory allocations).
-//
+///
/// Structure of the Implementation:
/// - All client APIs are defined in this file, request-ranges.h and request-context.h.
/// Clients can include only the files that they need.
@@ -204,10 +222,12 @@ class DiskIoMgr : public CacheLineAligned {
/// disk. This is also the max queue depth.
/// - threads_per_solid_state_disk: number of read threads to create per solid state
/// disk. This is also the max queue depth.
- /// - min_buffer_size: minimum io buffer size (in bytes)
- /// - max_buffer_size: maximum io buffer size (in bytes). Also the max read size.
+ /// - min_buffer_size: minimum io buffer size (in bytes). Will be rounded down to the
+ // nearest power-of-two.
+ /// - max_buffer_size: maximum io buffer size (in bytes). Will be rounded up to the
+ /// nearest power-of-two. Also the max read size.
DiskIoMgr(int num_disks, int threads_per_rotational_disk,
- int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size);
+ int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size);
/// Create DiskIoMgr with default configs.
DiskIoMgr();
@@ -217,16 +237,13 @@ class DiskIoMgr : public CacheLineAligned {
virtual ~DiskIoMgr();
/// Initialize the IoMgr. Must be called once before any of the other APIs.
- Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
+ Status Init() WARN_UNUSED_RESULT;
+
/// Allocates tracking structure for a request context.
/// Register a new request context and return it to the caller. The caller must call
/// UnregisterContext() for each context.
- /// reader_mem_tracker: Is non-null only for readers. IO buffers
- /// used for this reader will be tracked by this. If the limit is exceeded
- /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
- /// GetNext().
- std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker);
+ std::unique_ptr<RequestContext> RegisterContext();
/// Unregisters context from the disk IoMgr by first cancelling it then blocking until
/// all references to the context are removed from I/O manager internal data structures.
@@ -236,50 +253,60 @@ class DiskIoMgr : public CacheLineAligned {
/// up.
void UnregisterContext(RequestContext* context);
- /// This function cancels the context asychronously. All outstanding requests
- /// are aborted and tracking structures cleaned up. This does not need to be
- /// called if the context finishes normally.
- /// This will also fail any outstanding GetNext()/Read requests.
- void CancelContext(RequestContext* context);
-
- /// Adds the scan ranges to the queues. This call is non-blocking. The caller must
- /// not deallocate the scan range pointers before UnregisterContext().
- /// If schedule_immediately, the ranges are immediately put on the read queue
- /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
- /// This can be used to do synchronous reads as well as schedule dependent ranges,
- /// as in the case for columnar formats.
- Status AddScanRanges(RequestContext* reader,
- const std::vector<ScanRange*>& ranges,
- bool schedule_immediately = false) WARN_UNUSED_RESULT;
- Status AddScanRange(RequestContext* reader, ScanRange* range,
- bool schedule_immediately = false) WARN_UNUSED_RESULT;
+ /// Adds the scan ranges to reader's queues, but does not start scheduling it. The range
+ /// can be scheduled by a thread calling GetNextUnstartedRange(). This call is
+ /// non-blocking. The caller must not deallocate the scan range pointers before
+ /// UnregisterContext(). 'ranges' must not be empty.
+ Status AddScanRanges(
+ RequestContext* reader, const std::vector<ScanRange*>& ranges) WARN_UNUSED_RESULT;
+
+ /// Adds the scan range to the queues, as with AddScanRanges(), but immediately
+ /// start scheduling the scan range. This can be used to do synchronous reads as well
+ /// as schedule dependent ranges, e.g. for columnar formats. This call is non-blocking.
+ /// The caller must not deallocate the scan range pointers before UnregisterContext().
+ ///
+ /// If this returns true in '*needs_buffers', the caller must then call
+ /// AllocateBuffersForRange() to add buffers for the data to be read into before the
+ /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr will
+ /// asynchronously read the data for the range and the caller can call
+ /// ScanRange::GetNext() to read the data.
+ Status StartScanRange(
+ RequestContext* reader, ScanRange* range, bool* needs_buffers) WARN_UNUSED_RESULT;
/// Add a WriteRange for the writer. This is non-blocking and schedules the context
/// on the IoMgr disk queue. Does not create any files.
Status AddWriteRange(
RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT;
- /// Returns the next unstarted scan range for this reader. When the range is returned,
- /// the disk threads in the IoMgr will already have started reading from it. The
- /// caller is expected to call ScanRange::GetNext on the returned range.
- /// If there are no more unstarted ranges, nullptr is returned.
- /// This call is blocking.
- Status GetNextRange(RequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT;
-
- /// Reads the range and returns the result in buffer.
- /// This behaves like the typical synchronous read() api, blocking until the data
- /// is read. This can be called while there are outstanding ScanRanges and is
- /// thread safe. Multiple threads can be calling Read() per reader at a time.
- /// range *cannot* have already been added via AddScanRanges.
- /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller
- /// than max_read_buffer_size()) or if reading into a client-provided buffer.
- Status Read(RequestContext* reader, ScanRange* range,
- std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
-
- /// Returns the buffer to the IoMgr. This must be called for every buffer
- /// returned by GetNext()/Read() that did not return an error. This is non-blocking.
- /// After calling this, the buffer descriptor is invalid and cannot be accessed.
- void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
+ /// Tries to get an unstarted scan range that was added to 'reader' with
+ /// AddScanRanges(). On success, returns OK and returns the range in '*range'.
+ /// If 'reader' was cancelled, returns CANCELLED. If another error is encountered,
+ /// an error status is returned. Otherwise, if error or cancellation wasn't encountered
+ /// and there are no unstarted ranges for 'reader', returns OK and sets '*range' to
+ /// nullptr.
+ ///
+ /// If '*needs_buffers' is returned as true, the caller must call
+ /// AllocateBuffersForRange() to add buffers for the data to be read into before the
+ /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr will
+ /// asynchronously read the data for the range and the caller can call
+ /// ScanRange::GetNext() to read the data.
+ Status GetNextUnstartedRange(RequestContext* reader, ScanRange** range,
+ bool* needs_buffers) WARN_UNUSED_RESULT;
+
+ /// Allocates up to 'max_bytes' buffers to read the data from 'range' into and schedules
+ /// the range. Called after StartScanRange() or GetNextUnstartedRange() returns
+ /// *needs_buffers=true.
+ ///
+ /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >=
+ /// min_read_buffer_size() so that at least one buffer can be allocated. The caller
+ /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok
+ /// if the buffers were successfully allocated and the range was scheduled.
+ ///
+ /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size()
+ /// will typically maximize I/O throughput. See the "Buffer Management" section of
+ /// the class comment for explanation.
+ Status AllocateBuffersForRange(RequestContext* reader,
+ BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes);
/// Determine which disk queue this file should be assigned to. Returns an index into
/// disk_queues_. The disk_id is the volume ID for the local disk that holds the
@@ -287,32 +314,8 @@ class DiskIoMgr : public CacheLineAligned {
/// co-located with the datanode for this file.
int AssignQueue(const char* file, int disk_id, bool expected_local);
- /// TODO: The functions below can be moved to RequestContext.
- /// Returns the current status of the context.
- Status context_status(RequestContext* context) const WARN_UNUSED_RESULT;
-
- void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
- void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
- void set_open_file_timer(RequestContext*, RuntimeProfile::Counter*);
- void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
- void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
-
- int64_t queue_size(RequestContext* reader) const;
- int64_t bytes_read_local(RequestContext* reader) const;
- int64_t bytes_read_short_circuit(RequestContext* reader) const;
- int64_t bytes_read_dn_cache(RequestContext* reader) const;
- int num_remote_ranges(RequestContext* reader) const;
- int64_t unexpected_remote_bytes(RequestContext* reader) const;
- int cached_file_handles_hit_count(RequestContext* reader) const;
- int cached_file_handles_miss_count(RequestContext* reader) const;
-
- /// Returns the read throughput across all readers.
- /// TODO: should this be a sliding window? This should report metrics for the
- /// last minute, hour and since the beginning.
- int64_t GetReadThroughput();
-
- /// Returns the maximum read buffer size
- int max_read_buffer_size() const { return max_buffer_size_; }
+ int64_t min_buffer_size() const { return min_buffer_size_; }
+ int64_t max_buffer_size() const { return max_buffer_size_; }
/// Returns the total number of disk queues (both local and remote).
int num_total_disks() const { return disk_queues_.size(); }
@@ -365,10 +368,6 @@ class DiskIoMgr : public CacheLineAligned {
Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
RequestContext* reader, CachedHdfsFileHandle** fid);
- /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if
- /// 'bytes_to_free' is -1.
- void GcIoBuffers(int64_t bytes_to_free = -1);
-
// Function to change the underlying LocalFileSystem object used for disk I/O.
// DiskIoMgr will also take responsibility of the received LocalFileSystem pointer.
// It is only for testing purposes to use a fault injected version of LocalFileSystem.
@@ -376,25 +375,6 @@ class DiskIoMgr : public CacheLineAligned {
local_file_system_ = std::move(fs);
}
- /// The maximum number of ready buffers that can be queued in a scan range. Having two
- /// queued buffers (plus the buffer that is returned to the client) gives good
- /// performance in most scenarios:
- /// 1. If the consumer is consuming data faster than we can read from disk, then the
- /// queue will be empty most of the time because the buffer will be immediately
- /// pulled off the queue as soon as it is added. There will always be an I/O request
- /// in the disk queue to maximize I/O throughput, which is the bottleneck in this
- /// case.
- /// 2. If we can read from disk faster than the consumer is consuming data, the queue
- /// will fill up and there will always be a buffer available for the consumer to
- /// read, so the consumer will not block and we maximize consumer throughput, which
- /// is the bottleneck in this case.
- /// 3. If the consumer is consuming data at approximately the same rate as we are
- /// reading from disk, then the steady state is that the consumer is processing one
- /// buffer and one buffer is in the disk queue. The additional buffer can absorb
- /// bursts where the producer runs faster than the consumer or the consumer runs
- /// faster than the producer without blocking either the producer or consumer.
- static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
-
/// "Disk" queue offsets for remote accesses. Offset 0 corresponds to
/// disk ID (i.e. disk_queue_ index) of num_local_disks().
enum {
@@ -404,6 +384,10 @@ class DiskIoMgr : public CacheLineAligned {
REMOTE_NUM_DISKS
};
+ /// The ideal number of max-sized buffers per scan range to maximise throughput.
+ /// See "Buffer Management" in the class comment for explanation.
+ static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;
+
private:
friend class BufferDescriptor;
friend class RequestContext;
@@ -412,16 +396,9 @@ class DiskIoMgr : public CacheLineAligned {
struct DiskQueue;
friend class DiskIoMgrTest_Buffers_Test;
+ friend class DiskIoMgrTest_BufferSizeSelection_Test;
friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
- /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
- boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
-
- /// Memory tracker for I/O buffers where the RequestContext has no MemTracker.
- /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't
- /// provide a MemTracker.
- boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
-
// Handles the low level I/O functionality.
std::unique_ptr<LocalFileSystem> local_file_system_;
@@ -434,10 +411,10 @@ class DiskIoMgr : public CacheLineAligned {
const int num_io_threads_per_solid_state_disk_;
/// Maximum read size. This is also the maximum size of each allocated buffer.
- const int max_buffer_size_;
+ const int64_t max_buffer_size_;
- /// The minimum size of each read buffer.
- const int min_buffer_size_;
+ /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len().
+ const int64_t min_buffer_size_;
/// Thread group containing all the worker threads.
ThreadGroup disk_thread_group_;
@@ -455,28 +432,6 @@ class DiskIoMgr : public CacheLineAligned {
/// Total time spent in hdfs reading
RuntimeProfile::Counter read_timer_;
- /// Protects free_buffers_
- boost::mutex free_buffers_lock_;
-
- /// Free buffers that can be handed out to clients. There is one list for each buffer
- /// size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The
- /// maximum buffer size is max_buffer_size_, so the maximum index is
- /// Log2(max_buffer_size_ / min_buffer_size_).
- //
- /// E.g. if min_buffer_size_ = 1024 bytes:
- /// free_buffers_[0] => list of free buffers with size 1024 B
- /// free_buffers_[1] => list of free buffers with size 2048 B
- /// free_buffers_[10] => list of free buffers with size 1 MB
- /// free_buffers_[13] => list of free buffers with size 8 MB
- /// free_buffers_[n] => list of free buffers with size 2^n * 1024 B
- std::vector<std::deque<uint8_t*>> free_buffers_;
-
- /// Total number of allocated buffers, used for debugging.
- AtomicInt32 num_allocated_buffers_;
-
- /// Total number of buffers in readers
- AtomicInt32 num_buffers_in_readers_;
-
/// Per disk queues. This is static and created once at Init() time. One queue is
/// allocated for each local disk on the system and for each remote filesystem type.
/// It is indexed by disk id.
@@ -492,23 +447,6 @@ class DiskIoMgr : public CacheLineAligned {
// handles are closed.
FileHandleCache file_handle_cache_;
- /// Returns the index into free_buffers_ for a given buffer size
- int free_buffers_idx(int64_t buffer_size);
-
- /// Returns a buffer to read into with size between 'buffer_size' and
- /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
- /// 'free_buffers_', that is returned, otherwise a new one is allocated.
- /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
- /// The buffer memory is tracked against reader's mem tracker, or
- /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
- std::unique_ptr<BufferDescriptor> GetFreeBuffer(
- RequestContext* reader, ScanRange* range, int64_t buffer_size);
-
- /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either
- /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to
- /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr.
- void FreeBufferMemory(BufferDescriptor* desc);
-
/// Disk worker thread loop. This function retrieves the next range to process on
/// the disk queue and invokes ReadRange() or Write() depending on the type of Range().
/// There can be multiple threads per disk running this loop.
@@ -522,10 +460,12 @@ class DiskIoMgr : public CacheLineAligned {
bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
RequestContext** request_context);
- /// Updates disk queue and reader state after a read is complete. The read result
- /// is captured in the buffer descriptor.
+ /// Updates disk queue and reader state after a read is complete. If the read
+ /// was successful, 'read_status' is ok and 'buffer' contains the result of the
+ /// read. If the read failed with an error, 'read_status' contains the error and
+ /// 'buffer' has the buffer that was meant to hold the result of the read.
void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
- std::unique_ptr<BufferDescriptor> buffer);
+ Status read_status, std::unique_ptr<BufferDescriptor> buffer);
/// Invokes write_range->callback_ after the range has been written and
/// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
@@ -548,15 +488,14 @@ class DiskIoMgr : public CacheLineAligned {
/// Does not open or close the file that is written.
Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT;
- /// Reads the specified scan range and calls HandleReadFinished when done.
+ /// Reads the specified scan range and calls HandleReadFinished() when done. If no
+ /// buffer is available to read the range's data into, the read cannot proceed, the
+ /// range becomes blocked and this function returns without doing I/O.
void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range);
- /// Try to allocate the next buffer for the scan range, returning the new buffer
- /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr.
- /// If there is memory pressure and buffers are already queued, adds the range
- /// to the blocked ranges and returns nullptr.
- std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue,
- RequestContext* reader, ScanRange* range, int64_t buffer_size);
+ /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range
+ /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated.
+ std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes);
};
}
}