You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2018/03/03 22:33:33 UTC
[5/9] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index dec6aa6..287f53a 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -17,122 +17,74 @@
#include "runtime/io/disk-io-mgr-internal.h"
-#include "runtime/exec-env.h"
-
#include "common/names.h"
using namespace impala;
using namespace impala::io;
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
- RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
- int64_t buffer_len)
- : io_mgr_(io_mgr),
- reader_(reader),
- scan_range_(scan_range),
- buffer_(buffer),
- buffer_len_(buffer_len) {
- DCHECK(io_mgr != nullptr);
- DCHECK(scan_range != nullptr);
- DCHECK(buffer != nullptr);
- DCHECK_GE(buffer_len, 0);
-}
+void RequestContext::Cancel(const Status& status) {
+ DCHECK(!status.ok());
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
- ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
- BufferPool::BufferHandle handle) :
- io_mgr_(io_mgr),
- reader_(reader),
- scan_range_(scan_range),
- buffer_(handle.data()),
- buffer_len_(handle.len()),
- bp_client_(bp_client),
- handle_(move(handle)) {
- DCHECK(io_mgr != nullptr);
- DCHECK(scan_range != nullptr);
- DCHECK(bp_client_->is_registered());
- DCHECK(handle_.is_open());
-}
-
-void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
- DCHECK(buffer->buffer_ != nullptr);
- if (!buffer->is_cached() && !buffer->is_client_buffer()) {
- // Only buffers that were allocated by DiskIoMgr need to be freed.
- ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
- buffer->bp_client_, &buffer->handle_);
- }
- buffer->buffer_ = nullptr;
-}
-
-// Cancellation of a RequestContext requires coordination from multiple threads that may
-// hold references to the context:
-// 1. Disk threads that are currently processing a range for this context.
-// 2. Caller threads that are waiting in GetNext().
-//
-// Each thread that currently has a reference to the request context must notice the
-// cancel, cancel any pending operations involving the context and remove the contxt from
-// tracking structures. Once no more operations are pending on the context and no more
-// I/O mgr threads hold references to the context, the context can be marked inactive
-// (see CancelAndMarkInactive()), after which the owner of the context object can free
-// it.
-//
-// The steps are:
-// 1. Cancel() will immediately set the context in the Cancelled state. This prevents any
-// other thread from adding more ready buffers to the context (they all take a lock and
-// check the state before doing so), or any write ranges to the context.
-// 2. Cancel() will call Cancel() on each ScanRange that is not yet complete, unblocking
-// any threads in GetNext(). If there was no prior error for a scan range, any reads from
-// that scan range will return a CANCELLED Status. Cancel() also invokes callbacks for
-// all WriteRanges with a CANCELLED Status.
-// 3. Disk threads notice the context is cancelled either when picking the next context
-// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled
-// state, removes the context from the disk queue. The last thread per disk then calls
-// DecrementDiskRefCount(). After the last disk thread has called DecrementDiskRefCount(),
-// cancellation is done and it is safe to unregister the context.
-void RequestContext::Cancel() {
// Callbacks are collected in this vector and invoked while no lock is held.
vector<WriteRange::WriteDoneCallback> write_callbacks;
{
- unique_lock<mutex> lock(lock_);
+ lock_guard<mutex> lock(lock_);
DCHECK(Validate()) << endl << DebugString();
// Already being cancelled
if (state_ == RequestContext::Cancelled) return;
+ DCHECK(status_.ok());
+ status_ = status;
+
// The reader will be put into a cancelled state until call cleanup is complete.
state_ = RequestContext::Cancelled;
- // Clear out all request ranges from queues for this reader. Cancel the scan
- // ranges and invoke the write range callbacks to propagate the cancellation.
- for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED);
- active_scan_ranges_.clear();
- for (PerDiskState& disk_state : disk_states_) {
- RequestRange* range;
- while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) {
- if (range->request_type() == RequestType::WRITE) {
+ // Cancel all scan ranges for this reader. Each range could be one one of
+ // four queues.
+ for (int i = 0; i < disk_states_.size(); ++i) {
+ RequestContext::PerDiskState& state = disk_states_[i];
+ RequestRange* range = NULL;
+ while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
+ if (range->request_type() == RequestType::READ) {
+ static_cast<ScanRange*>(range)->Cancel(status);
+ } else {
+ DCHECK(range->request_type() == RequestType::WRITE);
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
}
}
- while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr);
+
+ ScanRange* scan_range;
+ while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
+ scan_range->Cancel(status);
+ }
WriteRange* write_range;
- while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) {
+ while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) {
write_callbacks.push_back(write_range->callback_);
}
}
- // Clear out the lists of scan ranges.
- while (ready_to_start_ranges_.Dequeue() != nullptr);
- while (cached_ranges_.Dequeue() != nullptr);
- // Ensure that the reader is scheduled on all disks (it may already be scheduled on
- // some). The disk threads will notice that the context is cancelled and do any
- // required cleanup for the disk state.
+ ScanRange* range = NULL;
+ while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
+ range->Cancel(status);
+ }
+ while ((range = blocked_ranges_.Dequeue()) != NULL) {
+ range->Cancel(status);
+ }
+ while ((range = cached_ranges_.Dequeue()) != NULL) {
+ range->Cancel(status);
+ }
+
+ // Schedule reader on all disks. The disks will notice it is cancelled and do any
+ // required cleanup
for (int i = 0; i < disk_states_.size(); ++i) {
- disk_states_[i].ScheduleContext(lock, this, i);
+ RequestContext::PerDiskState& state = disk_states_[i];
+ state.ScheduleContext(this, i);
}
}
for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
- write_callback(Status::CANCELLED);
+ write_callback(status_);
}
// Signal reader and unblock the GetNext/Read thread. That read will fail with
@@ -141,7 +93,7 @@ void RequestContext::Cancel() {
}
void RequestContext::CancelAndMarkInactive() {
- Cancel();
+ Cancel(Status::CANCELLED);
boost::unique_lock<boost::mutex> l(lock_);
DCHECK_NE(state_, Inactive);
@@ -150,76 +102,54 @@ void RequestContext::CancelAndMarkInactive() {
// Wait until the ranges finish up.
while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
- // Validate that no ranges are active.
- DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString();
-
- // Validate that no threads are active and the context is not queued.
- for (const PerDiskState& disk_state : disk_states_) {
- DCHECK_EQ(0, disk_state.in_flight_ranges()->size()) << endl << DebugString();
- DCHECK_EQ(0, disk_state.unstarted_scan_ranges()->size()) << endl << DebugString();
- DCHECK_EQ(0, disk_state.num_threads_in_op()) << endl << DebugString();
- DCHECK(!disk_state.is_on_queue()) << endl << DebugString();
- }
+ // Validate that no buffers were leaked from this context.
+ DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
+ DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
DCHECK(Validate()) << endl << DebugString();
state_ = Inactive;
}
-void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
- RequestRange* range, ScheduleMode schedule_mode) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- DCHECK_EQ(state_, Active) << DebugString();
- PerDiskState* disk_state = &disk_states_[range->disk_id()];
- if (disk_state->done()) {
- DCHECK_EQ(disk_state->num_remaining_ranges(), 0);
- disk_state->set_done(false);
+void RequestContext::AddRequestRange(
+ RequestRange* range, bool schedule_immediately) {
+ // DCHECK(lock_.is_locked()); // TODO: boost should have this API
+ RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+ if (state.done()) {
+ DCHECK_EQ(state.num_remaining_ranges(), 0);
+ state.set_done(false);
++num_disks_with_ranges_;
}
+
+ bool schedule_context;
if (range->request_type() == RequestType::READ) {
ScanRange* scan_range = static_cast<ScanRange*>(range);
- if (schedule_mode == ScheduleMode::IMMEDIATELY) {
- ScheduleScanRange(lock, scan_range);
- } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) {
- disk_state->unstarted_scan_ranges()->Enqueue(scan_range);
+ if (schedule_immediately) {
+ ScheduleScanRange(scan_range);
+ } else {
+ state.unstarted_scan_ranges()->Enqueue(scan_range);
num_unstarted_scan_ranges_.Add(1);
- // If there's no 'next_scan_range_to_start', schedule this RequestContext so that
- // one of the 'unstarted_scan_ranges' will become the 'next_scan_range_to_start'.
- if (disk_state->next_scan_range_to_start() == nullptr) {
- disk_state->ScheduleContext(lock, this, range->disk_id());
- }
}
+ // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
+ // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
+ // invoked.
+ schedule_context = state.next_scan_range_to_start() == NULL;
} else {
DCHECK(range->request_type() == RequestType::WRITE);
- DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode);
+ DCHECK(!schedule_immediately);
WriteRange* write_range = static_cast<WriteRange*>(range);
- disk_state->unstarted_write_ranges()->Enqueue(write_range);
+ state.unstarted_write_ranges()->Enqueue(write_range);
- // Ensure that the context is scheduled so that the write range gets picked up.
- // ScheduleContext() has no effect if already scheduled, so this is safe to do always.
- disk_state->ScheduleContext(lock, this, range->disk_id());
+ // ScheduleContext() has no effect if the context is already scheduled,
+ // so this is safe.
+ schedule_context = true;
}
- ++disk_state->num_remaining_ranges();
-}
-
-void RequestContext::AddActiveScanRangeLocked(
- const unique_lock<mutex>& lock, ScanRange* range) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- DCHECK(state_ == Active);
- active_scan_ranges_.insert(range);
-}
-void RequestContext::RemoveActiveScanRange(ScanRange* range) {
- unique_lock<mutex> lock(lock_);
- RemoveActiveScanRangeLocked(lock, range);
+ if (schedule_context) state.ScheduleContext(this, range->disk_id());
+ ++state.num_remaining_ranges();
}
-void RequestContext::RemoveActiveScanRangeLocked(
- const unique_lock<mutex>& lock, ScanRange* range) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- active_scan_ranges_.erase(range);
-}
-
-RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
- : parent_(parent), disk_states_(num_disks) {}
+RequestContext::RequestContext(
+ DiskIoMgr* parent, int num_disks, MemTracker* tracker)
+ : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
// Dumps out request context information. Lock should be taken by caller
string RequestContext::DebugString() const {
@@ -229,9 +159,13 @@ string RequestContext::DebugString() const {
if (state_ == RequestContext::Cancelled) ss << "Cancelled";
if (state_ == RequestContext::Active) ss << "Active";
if (state_ != RequestContext::Inactive) {
- ss << " #disk_with_ranges=" << num_disks_with_ranges_
- << " #disks=" << num_disks_with_ranges_
- << " #active scan ranges=" << active_scan_ranges_.size();
+ ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
+ << " #ready_buffers=" << num_ready_buffers_.Load()
+ << " #used_buffers=" << num_used_buffers_.Load()
+ << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
+ << " #finished_scan_ranges=" << num_finished_ranges_.Load()
+ << " #disk_with_ranges=" << num_disks_with_ranges_
+ << " #disks=" << num_disks_with_ranges_;
for (int i = 0; i < disk_states_.size(); ++i) {
ss << endl << " " << i << ": "
<< "is_on_queue=" << disk_states_[i].is_on_queue()
@@ -254,6 +188,16 @@ bool RequestContext::Validate() const {
return false;
}
+ if (num_used_buffers_.Load() < 0) {
+ LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
+ return false;
+ }
+
+ if (num_ready_buffers_.Load() < 0) {
+ LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
+ return false;
+ }
+
int total_unstarted_ranges = 0;
for (int i = 0; i < disk_states_.size(); ++i) {
const PerDiskState& state = disk_states_[i];
@@ -331,8 +275,8 @@ bool RequestContext::Validate() const {
LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
return false;
}
- if (!active_scan_ranges_.empty()) {
- LOG(WARNING) << "Reader cancelled but has active ranges.";
+ if (!blocked_ranges_.empty()) {
+ LOG(WARNING) << "Reader cancelled but has blocked ranges.";
return false;
}
}
@@ -340,11 +284,10 @@ bool RequestContext::Validate() const {
return true;
}
-void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& context_lock,
+void RequestContext::PerDiskState::ScheduleContext(
RequestContext* context, int disk_id) {
- DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
- if (is_on_queue_.Load() == 0 && !done_) {
- is_on_queue_.Store(1);
+ if (!is_on_queue_ && !done_) {
+ is_on_queue_ = true;
context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index b028596..fd68669 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -23,13 +23,10 @@
namespace impala {
namespace io {
-
-// Mode argument for AddRangeToDisk().
-enum class ScheduleMode {
- IMMEDIATELY, UPON_GETNEXT, BY_CALLER
-};
/// A request context is used to group together I/O requests belonging to a client of the
-/// I/O manager for management and scheduling.
+/// I/O manager for management and scheduling. For most I/O manager clients it is an
+/// opaque pointer, but some clients may need to include this header, e.g. to make the
+/// unique_ptr<DiskIoRequestContext> destructor work correctly.
///
/// Implementation Details
/// ======================
@@ -37,109 +34,56 @@ enum class ScheduleMode {
/// maintains state across all disks as well as per disk state.
/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
/// WriteRange.
-/// A scan range for the reader is on one of six states:
-/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued
+/// A scan range for the reader is on one of five states:
+/// 1) PerDiskState's unstarted_ranges: This range has only been queued
/// and nothing has been read from it.
-/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be started.
-/// As soon as the reader picks it up, it will move to the 'in_flight_ranges_'
+/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started.
+/// As soon as the reader picks it up, it will move to the in_flight_ranges
/// queue.
-/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and will
+/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
/// be read from the next time a disk thread picks it up in GetNextRequestRange()
-/// 4) The ScanRange is blocked waiting for buffers because it does not have any unused
-/// buffers to read data into. It is unblocked when a client adds new buffers via
-/// AllocateBuffersForRange() or returns existing buffers via ReturnBuffer().
-/// ScanRanges in this state are identified by 'blocked_on_buffer_' == true.
-/// 5) ScanRange is cached and in the 'cached_ranges_' queue.
-/// 6) Inactive - either all the data for the range was returned or the range was
-/// cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ != OK.
-///
-/// If the scan range is read and does not get blocked waiting for buffers, the
+/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
+/// anymore. We need the caller to pull a buffer off which will put this in
+/// the in_flight_ranges queue. These ranges are in the RequestContext's
+/// blocked_ranges_ queue.
+/// 5) ScanRange is cached and in the cached_ranges_ queue.
+//
+/// If the scan range is read and does not get blocked on the outgoing queue, the
/// transitions are: 1 -> 2 -> 3.
/// If the scan range does get blocked, the transitions are
/// 1 -> 2 -> 3 -> (4 -> 3)*
-///
-/// In the case of a cached scan range, the range is immediately put in 'cached_ranges_'.
+//
+/// In the case of a cached scan range, the range is immediately put in cached_ranges_.
/// When the caller asks for the next range to process, we first pull ranges from
-/// the 'cache_ranges_' queue. If the range was cached, the range is removed and
+/// the cache_ranges_ queue. If the range was cached, the range is removed and
/// done (ranges are either entirely cached or not at all). If the cached read attempt
/// fails, we put the range in state 1.
-///
-/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that they can be
-/// cancelled when the RequestContext is cancelled. Scan ranges are removed from
-/// 'active_scan_ranges_' during their transition to state 6.
-///
-/// A write range for a context may be in one of two queues:
-/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not processed.
-/// 2) 'in_flight_ranges_': The write range is ready to be processed by the next disk
-/// thread that picks it up in GetNextRequestRange().
-///
+//
+/// A write range for a context may be in one of two lists:
+/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed.
+/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread
+/// that picks it up in GetNextRequestRange().
+//
/// AddWriteRange() adds WriteRanges for a disk.
/// It is the responsibility of the client to pin the data to be written via a WriteRange
/// in memory. After a WriteRange has been written, a callback is invoked to inform the
/// client that the write has completed.
-///
+//
/// An important assumption is that write does not exceed the maximum read size and that
/// the entire range is written when the write request is handled. (In other words, writes
/// are not broken up.)
-///
+//
/// When a RequestContext is processed by a disk thread in GetNextRequestRange(),
/// a write range is always removed from the list of unstarted write ranges and appended
/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
-/// that is scheduled (by calling GetNextUnstartedRange()) is always followed by a write
-/// (if one exists). And since at most one WriteRange can be present in in_flight_ranges_
-/// at any time (once a write range is returned from GetNetxRequestRange() it is completed
-/// and not re-enqueued), a scan range scheduled via a call to GetNextUnstartedRange() can
-/// be queued up behind at most one write range.
+/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
+/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any
+/// time (once a write range is returned from GetNetxRequestRange() it is completed an
+/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
+/// behind at most one write range.
class RequestContext {
public:
- ~RequestContext() {
- DCHECK_EQ(state_, Inactive) << "Must be unregistered. " << DebugString();
- }
-
- /// Cancel the context asynchronously. All outstanding requests are cancelled
- /// asynchronously. This does not need to be called if the context finishes normally.
- /// Calling GetNext() on any scan ranges belonging to this RequestContext will return
- /// CANCELLED (or another error, if an error was encountered for that scan range before
- /// it is cancelled).
- void Cancel();
-
- bool IsCancelled() {
- boost::unique_lock<boost::mutex> lock(lock_);
- return state_ == Cancelled;
- }
-
- int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
- int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
- int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
- int num_remote_ranges() const { return num_remote_ranges_.Load(); }
- int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); }
-
- int cached_file_handles_hit_count() const {
- return cached_file_handles_hit_count_.Load();
- }
-
- int cached_file_handles_miss_count() const {
- return cached_file_handles_miss_count_.Load();
- }
-
- void set_bytes_read_counter(RuntimeProfile::Counter* bytes_read_counter) {
- bytes_read_counter_ = bytes_read_counter;
- }
-
- void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; }
-
- void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) {
- open_file_timer_ = open_file_timer;
- }
-
- void set_active_read_thread_counter(
- RuntimeProfile::Counter* active_read_thread_counter) {
- active_read_thread_counter_ = active_read_thread_counter;
- }
-
- void set_disks_accessed_bitmap(RuntimeProfile::Counter* disks_accessed_bitmap) {
- disks_accessed_bitmap_ = disks_accessed_bitmap;
- }
+ ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
private:
DISALLOW_COPY_AND_ASSIGN(RequestContext);
@@ -162,19 +106,13 @@ class RequestContext {
Inactive,
};
- RequestContext(DiskIoMgr* parent, int num_disks);
-
- /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(),
- /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the
- /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL.
- /// Does not acquire 'lock_'.
- void FreeBuffer(BufferDescriptor* buffer);
+ RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
/// Decrements the number of active disks for this reader. If the disk count
/// goes to 0, the disk complete condition variable is signaled.
- /// 'lock_' must be held via 'lock'.
- void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ /// Reader lock must be taken before this call.
+ void DecrementDiskRefCount() {
+ // boost doesn't let us dcheck that the reader lock is taken
DCHECK_GT(num_disks_with_ranges_, 0);
if (--num_disks_with_ranges_ == 0) {
disks_complete_cond_var_.NotifyAll();
@@ -191,48 +129,25 @@ class RequestContext {
/// Adds range to in_flight_ranges, scheduling this reader on the disk threads
/// if necessary.
- /// 'lock_' must be held via 'lock'. Only valid to call if this context is active.
- void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ /// Reader lock must be taken before this.
+ void ScheduleScanRange(ScanRange* range) {
DCHECK_EQ(state_, Active);
- DCHECK(range != nullptr);
+ DCHECK(range != NULL);
RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
state.in_flight_ranges()->Enqueue(range);
- state.ScheduleContext(lock, this, range->disk_id());
+ state.ScheduleContext(this, range->disk_id());
}
+ /// Cancels the context with status code 'status'
+ void Cancel(const Status& status);
+
/// Cancel the context if not already cancelled, wait for all scan ranges to finish
/// and mark the context as inactive, after which it cannot be used.
void CancelAndMarkInactive();
- /// Adds a request range to the appropriate disk state. 'schedule_mode' controls which
- /// queue the range is placed in. This RequestContext is scheduled on the disk state
- /// if required by 'schedule_mode'.
- ///
- /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added to the
- /// 'unstarted_write_ranges_' queue, from which they will be asynchronously moved to the
- /// 'in_flight_ranges_' queue.
- ///
- /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, the range is
- /// immediately added to the 'in_flight_ranges_' queue where it will be processed
- /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to the
- /// 'unstarted_ranges_' queue, from which it can be returned to a client by
- /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not added to
- /// any queues. The range will be scheduled later as a separate step, e.g. when it is
- /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'.
- void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, RequestRange* range,
- ScheduleMode schedule_mode);
-
- /// Adds an active range to 'active_scan_ranges_'
- void AddActiveScanRangeLocked(
- const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
-
- /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after eos or
- /// cancellation. If calling the Locked version, the caller must hold
- /// 'lock_'. Otherwise the function will acquire 'lock_'.
- void RemoveActiveScanRange(ScanRange* range);
- void RemoveActiveScanRangeLocked(
- const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+ /// Adds request range to disk queue for this request context. Currently,
+ /// schedule_immediately must be false is RequestRange is a write range.
+ void AddRequestRange(RequestRange* range, bool schedule_immediately);
/// Validates invariants of reader. Reader lock must be taken beforehand.
bool Validate() const;
@@ -243,6 +158,9 @@ class RequestContext {
/// Parent object
DiskIoMgr* const parent_;
+ /// Memory used for this reader. This is unowned by this object.
+ MemTracker* const mem_tracker_;
+
/// Total bytes read for this reader
RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
@@ -272,6 +190,13 @@ class RequestContext {
/// Total number of bytes from remote reads that were expected to be local.
AtomicInt64 unexpected_remote_bytes_{0};
+ /// The number of buffers that have been returned to the reader (via GetNext) that the
+ /// reader has not returned. Only included for debugging and diagnostics.
+ AtomicInt32 num_buffers_in_reader_{0};
+
+ /// The number of scan ranges that have been completed for this reader.
+ AtomicInt32 num_finished_ranges_{0};
+
/// The number of scan ranges that required a remote read, updated at the end of each
/// range scan. Only used for diagnostics.
AtomicInt32 num_remote_ranges_{0};
@@ -286,6 +211,17 @@ class RequestContext {
/// Total number of file handle opens where the file handle was not in the cache
AtomicInt32 cached_file_handles_miss_count_{0};
+ /// The number of buffers that are being used for this reader. This is the sum
+ /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
+ /// to be queued). This includes both IOMgr-allocated buffers and client-provided
+ /// buffers.
+ AtomicInt32 num_used_buffers_{0};
+
+ /// The total number of ready buffers across all ranges. Ready buffers are buffers
+ /// that have been read from disk but not retrieved by the caller.
+ /// This is the sum of all queued buffers in all ranges for this reader context.
+ AtomicInt32 num_ready_buffers_{0};
+
/// All fields below are accessed by multiple threads and the lock needs to be
/// taken before accessing them. Must be acquired before ScanRange::lock_ if both
/// are held simultaneously.
@@ -294,16 +230,8 @@ class RequestContext {
/// Current state of the reader
State state_ = Active;
- /// Scan ranges that have been added to the IO mgr for this context. Ranges can only
- /// be added when 'state_' is Active. When this context is cancelled, Cancel() is
- /// called for all the active ranges. If a client attempts to add a range while
- /// 'state_' is Cancelled, the range is not added to this list and Status::CANCELLED
- /// is returned to the client. This ensures that all active ranges are cancelled as a
- /// result of RequestContext cancellation.
- /// Ranges can be cancelled or hit eos non-atomically with their removal from this set,
- /// so eos or cancelled ranges may be temporarily present here. Cancelling these ranges
- /// a second time or cancelling after eos is safe and has no effect.
- boost::unordered_set<ScanRange*> active_scan_ranges_;
+ /// Status of this reader. Set to non-ok if cancelled.
+ Status status_;
/// The number of disks with scan ranges remaining (always equal to the sum of
/// disks with ranges).
@@ -312,18 +240,21 @@ class RequestContext {
/// This is the list of ranges that are expected to be cached on the DN.
/// When the reader asks for a new range (GetNextScanRange()), we first
/// return ranges from this list.
- InternalList<ScanRange> cached_ranges_;
+ InternalQueue<ScanRange> cached_ranges_;
/// A list of ranges that should be returned in subsequent calls to
- /// GetNextUnstartedRange().
+ /// GetNextRange.
/// There is a trade-off with when to populate this list. Populating it on
- /// demand means consumers need to wait (happens in DiskIoMgr::GetNextUnstartedRange()).
+ /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()).
/// Populating it preemptively means we make worse scheduling decisions.
/// We currently populate one range per disk.
/// TODO: think about this some more.
- InternalList<ScanRange> ready_to_start_ranges_;
+ InternalQueue<ScanRange> ready_to_start_ranges_;
ConditionVariable ready_to_start_ranges_cv_; // used with lock_
+ /// Ranges that are blocked due to back pressure on outgoing buffers.
+ InternalQueue<ScanRange> blocked_ranges_;
+
/// Condition variable for UnregisterContext() to wait for all disks to complete
ConditionVariable disks_complete_cond_var_;
@@ -342,9 +273,21 @@ class RequestContext {
next_scan_range_to_start_ = range;
}
- bool is_on_queue() const { return is_on_queue_.Load() != 0; }
+ /// We need to have a memory barrier to prevent this load from being reordered
+ /// with num_threads_in_op(), since these variables are set without the reader
+ /// lock taken
+ bool is_on_queue() const {
+ bool b = is_on_queue_;
+ __sync_synchronize();
+ return b;
+ }
- int num_threads_in_op() const { return num_threads_in_op_.Load(); }
+ int num_threads_in_op() const {
+ int v = num_threads_in_op_.Load();
+ // TODO: determine whether this barrier is necessary for any callsites.
+ AtomicUtil::MemoryBarrier();
+ return v;
+ }
const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
return &unstarted_scan_ranges_;
@@ -363,41 +306,26 @@ class RequestContext {
InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
/// Schedules the request context on this disk if it's not already on the queue.
- /// context->lock_ must be held by the caller via 'context_lock'.
- void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
- RequestContext* context, int disk_id);
-
- /// Increment the count of disk threads that have a reference to this context. These
- /// threads do not hold any locks while reading from HDFS, so we need to prevent the
- /// RequestContext from being destroyed underneath them.
- ///
- /// The caller does not need to hold 'lock_', so this can execute concurrently with
- /// itself and DecrementDiskThread().
- void IncrementDiskThreadAndDequeue() {
- /// Incrementing 'num_threads_in_op_' first so that there is no window when other
- /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
- /// references left to this context.
+ /// Context lock must be taken before this.
+ void ScheduleContext(RequestContext* context, int disk_id);
+
+ /// Increment the ref count on reader. We need to track the number of threads per
+ /// reader per disk that are in the unlocked hdfs read code section. This is updated
+ /// by multiple threads without a lock so we need to use an atomic int.
+ void IncrementRequestThreadAndDequeue() {
num_threads_in_op_.Add(1);
- is_on_queue_.Store(0);
+ is_on_queue_ = false;
}
- /// Decrement the count of disks threads with a reference to this context. Does final
- /// cleanup if the context is cancelled and this is the last thread for the disk.
- /// context->lock_ must be held by the caller via 'context_lock'.
- void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock,
- RequestContext* context) {
- DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
- num_threads_in_op_.Add(-1);
-
- if (context->state_ != Cancelled) {
- DCHECK_EQ(context->state_, Active);
- return;
- }
- // The state is cancelled, check to see if we're the last thread to touch the
- // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
- // in this order to avoid a race with IncrementDiskThreadAndDequeue().
- if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
- context->DecrementDiskRefCount(context_lock);
+ void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
+
+ /// Decrement request thread count and do final cleanup if this is the last
+ /// thread. RequestContext lock must be taken before this.
+ void DecrementRequestThreadAndCheckDone(RequestContext* context) {
+ num_threads_in_op_.Add(-1); // Also acts as a barrier.
+ if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
+ // This thread is the last one for this reader on this disk, do final cleanup
+ context->DecrementDiskRefCount();
done_ = true;
}
}
@@ -410,12 +338,7 @@ class RequestContext {
bool done_ = true;
/// For each disk, keeps track if the context is on this disk's queue, indicating
- /// the disk must do some work for this context. 1 means that the context is on the
- /// disk queue, 0 means that it's not on the queue (either because it has on ranges
- /// active for the disk or because a disk thread dequeued the context and is
- /// currently processing a request).
- ///
- /// The disk needs to do work in 4 cases:
+ /// the disk must do some work for this context. The disk needs to do work in 4 cases:
/// 1) in_flight_ranges is not empty, the disk needs to read for this reader.
/// 2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
/// read next.
@@ -426,15 +349,7 @@ class RequestContext {
/// useful that can be done. If there's nothing useful, the disk queue will wake up
/// and then remove the reader from the queue. Doing this causes thrashing of the
/// threads.
- ///
- /// This variable is important during context cancellation because it indicates
- /// whether a queue has a reference to the context that must be released before
- /// the context is considered unregistered. Atomically set to false after
- /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window
- /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there
- /// are no references left to this context.
- /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount.
- AtomicInt32 is_on_queue_{0};
+ bool is_on_queue_ = false;
/// For each disks, the number of request ranges that have not been fully read.
/// In the non-cancellation path, this will hit 0, and done will be set to true
@@ -448,7 +363,7 @@ class RequestContext {
/// Queue of pending IO requests for this disk in the order that they will be
/// processed. A ScanRange is added to this queue when it is returned in
- /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY.
+ /// GetNextRange(), or when it is added with schedule_immediately = true.
/// A WriteRange is added to this queue from unstarted_write_ranges_ for each
/// invocation of GetNextRequestRange() in WorkLoop().
/// The size of this queue is always less than or equal to num_remaining_ranges.
@@ -464,11 +379,11 @@ class RequestContext {
/// range to ready_to_start_ranges_.
ScanRange* next_scan_range_to_start_ = nullptr;
- /// For each disk, the number of disk threads issuing the underlying read/write on
- /// behalf of this context. There are a few places where we release the context lock,
- /// do some work, and then grab the lock again. Because we don't hold the lock for
- /// the entire operation, we need this ref count to keep track of which thread should
- /// do final resource cleanup during cancellation.
+ /// For each disk, the number of threads issuing the underlying read/write on behalf
+ /// of this context. There are a few places where we release the context lock, do some
+ /// work, and then grab the lock again. Because we don't hold the lock for the
+ /// entire operation, we need this ref count to keep track of which thread should do
+ /// final resource cleanup during cancellation.
/// Only the thread that sees the count at 0 should do the final cleanup.
AtomicInt32 num_threads_in_op_{0};
@@ -477,8 +392,7 @@ class RequestContext {
/// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
/// and writes. (Otherwise, since next_scan_range_to_start is set
/// in GetNextRequestRange() whenever it is null, repeated calls to
- /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being
- /// processed)
+ /// GetNextRequestRange() and GetNextRange() may result in only reads being processed)
InternalQueue<WriteRange> unstarted_write_ranges_;
};
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 0b234ac..222f847 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -23,15 +23,14 @@
#include <boost/thread/mutex.hpp>
-#include "common/atomic.h"
#include "common/hdfs.h"
#include "common/status.h"
-#include "runtime/bufferpool/buffer-pool.h"
#include "util/condition-variable.h"
#include "util/internal-queue.h"
-#include "util/mem-range.h"
namespace impala {
+class MemTracker;
+
namespace io {
class DiskIoMgr;
class RequestContext;
@@ -56,19 +55,24 @@ class BufferDescriptor {
/// Returns the offset within the scan range that this buffer starts at
int64_t scan_range_offset() const { return scan_range_offset_; }
+ /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set
+ /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not
+ /// check memory limits on 'dst': the caller should check the memory limit if a
+ /// different memory limit may apply to 'dst'. If the buffer was a client-provided
+ /// buffer, transferring is not allowed.
+ /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
+ void TransferOwnership(MemTracker* dst);
+
private:
friend class DiskIoMgr;
friend class ScanRange;
friend class RequestContext;
- /// Create a buffer descriptor for a new reader, range and data buffer.
- BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
- ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len);
-
- /// Create a buffer descriptor allocated from the buffer pool.
+ /// Create a buffer descriptor for a new reader, range and data buffer. The buffer
+ /// memory should already be accounted against 'mem_tracker'.
BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
- ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
- BufferPool::BufferHandle handle);
+ ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
+ MemTracker* mem_tracker);
/// Return true if this is a cached buffer owned by HDFS.
bool is_cached() const;
@@ -82,11 +86,14 @@ class BufferDescriptor {
/// Reader that this buffer is for.
RequestContext* const reader_;
+ /// The current tracker this buffer is associated with. After initialisation,
+ /// NULL for cached buffers and non-NULL for all other buffers.
+ MemTracker* mem_tracker_;
+
/// Scan range that this buffer is for. Non-NULL when initialised.
ScanRange* const scan_range_;
- /// Buffer for the read contents. Must be set to NULL in RequestContext::FreeBuffer()
- /// before destruction of the descriptor.
+ /// buffer with the read contents
uint8_t* buffer_;
/// length of buffer_. For buffers from cached reads, the length is 0.
@@ -98,12 +105,10 @@ class BufferDescriptor {
/// true if the current scan range is complete
bool eosr_ = false;
- int64_t scan_range_offset_ = 0;
+ /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr
+ Status status_;
- // Handle to an allocated buffer and the client used to allocate it buffer. Only used
- // for non-external buffers.
- BufferPool::ClientHandle* bp_client_ = nullptr;
- BufferPool::BufferHandle handle_;
+ int64_t scan_range_offset_ = 0;
};
/// The request type, read or write associated with a request range.
@@ -211,11 +216,11 @@ class ScanRange : public RequestRange {
/// Resets this scan range object with the scan range description. The scan range
/// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
- /// local filesystem). The scan range must be non-empty and fall within the file bounds
- /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
- /// queue to add the range to. If 'expected_local' is true, a warning is generated if
- /// the read did not come from a local disk. 'buffer_opts' specifies buffer management
- /// options - see the DiskIoMgr class comment and the BufferOpts comments for details.
+ /// local filesystem). The scan range must fall within the file bounds (offset >= 0
+ /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range
+ /// to. If 'expected_local' is true, a warning is generated if the read did not
+ /// come from a local disk. 'buffer_opts' specifies buffer management options -
+ /// see the DiskIoMgr class comment and the BufferOpts comments for details.
/// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
@@ -231,17 +236,10 @@ class ScanRange : public RequestRange {
/// Only one thread can be in GetNext() at any time.
Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
- /// Returns the buffer to the scan range. This must be called for every buffer
- /// returned by GetNext(). After calling this, the buffer descriptor is invalid
- /// and cannot be accessed.
- void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
- /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads
- /// blocked on GetNext(). Status is a non-ok status with the reason the range was
- /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or
- /// another error if an error was encountered while scanning the range. Status is
- /// returned to the any callers of GetNext(). If a thread is currently blocked in
- /// GetNext(), it is woken up.
+ /// Cancel this scan range. This cleans up all queued buffers and
+ /// wakes up any threads blocked on GetNext().
+ /// Status is the reason the range was cancelled. Must not be ok().
+ /// Status is returned to the user in GetNext().
void Cancel(const Status& status);
/// return a descriptive string for debug.
@@ -257,13 +255,18 @@ class ScanRange : public RequestRange {
/// Initialize internal fields
void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
- /// Enqueues a ready buffer with valid data for this range. This does not block.
+ /// Enqueues a buffer for this range. This does not block.
+ /// Returns true if this scan range has hit the queue capacity, false otherwise.
/// The caller passes ownership of buffer to the scan range and it is not
/// valid to access buffer after this call. The reader lock must be held by the
- /// caller. Returns false if the scan range was cancelled.
- bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+ /// caller.
+ bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
std::unique_ptr<BufferDescriptor> buffer);
+ /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
+ /// be called with any locks taken.
+ void CleanupQueuedBuffers();
+
/// Validates the internal state of this range. lock_ must be taken
/// before calling this.
bool Validate();
@@ -280,10 +283,6 @@ class ScanRange : public RequestRange {
/// exclusive use by this scan range. The scan range is the exclusive owner of the
/// file handle, and the file handle is destroyed in Close().
/// All local OS files are opened using normal OS file APIs.
- ///
- /// If an error is encountered during opening, returns a status describing the error.
- /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on
- /// success, returns OK.
Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
/// Closes the file for this range. This function only modifies state in this range.
@@ -291,10 +290,6 @@ class ScanRange : public RequestRange {
/// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns
/// the number of bytes read. The read position in this scan range is updated.
- ///
- /// If an error is encountered during reading, returns a status describing the error.
- /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on
- /// success, returns OK.
Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
bool* eosr) WARN_UNUSED_RESULT;
@@ -312,58 +307,6 @@ class ScanRange : public RequestRange {
Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
bool* read_succeeded) WARN_UNUSED_RESULT;
- /// Add buffers for the range to read data into and schedule the range if blocked.
- /// If 'returned' is true, the buffers returned from GetNext() that are being recycled
- /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added.
- void AddUnusedBuffers(
- std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
-
- /// Remove a buffer from 'unused_iomgr_buffers_' and update
- /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL.
- /// 'lock_' must be held by the caller via 'scan_range_lock'.
- std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
- const boost::unique_lock<boost::mutex>& scan_range_lock);
-
- /// Get the next buffer for this scan range for a disk thread to read into. Returns
- /// the new buffer if successful. If no buffers are available, marks the range
- /// as blocked and returns nullptr. Called must not hold 'lock_'.
- std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
-
- /// Cleans up a buffer that was not returned to the client.
- /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
- /// The caller must hold 'lock_' via 'scan_range_lock'.
- /// This function may acquire 'hdfs_lock_'
- void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
- std::unique_ptr<BufferDescriptor> buffer);
-
- /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not
- /// hold 'lock_'.
- void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers);
-
- /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan
- /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
- void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
-
- /// Same as Cancel() except reader_->lock must be held by the caller.
- void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
- const Status& status);
-
- /// Same as Cancel() except doesn't remove the scan range from
- /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(),
- /// which removes the range itself to avoid invalidating its active_scan_ranges_
- /// iterator.
- void CancelInternal(const Status& status);
-
- /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
- void SetBlockedOnBuffer();
-
- /// Returns true if no more buffers will be returned to clients in the future,
- /// either because of hitting eosr or cancellation.
- bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
- }
-
/// Pointer to caller specified metadata. This is untouched by the io manager
/// and the caller can put whatever auxiliary data in here.
void* meta_data_ = nullptr;
@@ -380,9 +323,6 @@ class ScanRange : public RequestRange {
/// TODO: we can do more with this
bool expected_local_ = false;
- /// Last modified time of the file associated with the scan range. Set in Reset().
- int64_t mtime_;
-
/// Total number of bytes read remotely. This is necessary to maintain a count of
/// the number of remote scan ranges. Since IO statistics can be collected multiple
/// times for a scan range, it is necessary to keep some state about whether this
@@ -429,10 +369,6 @@ class ScanRange : public RequestRange {
struct hadoopRzBuffer* cached_buffer_ = nullptr;
};
- /// The number of buffers that have been returned to a client via GetNext() that have
- /// not yet been returned with ReturnBuffer().
- AtomicInt32 num_buffers_in_reader_{0};
-
/// Lock protecting fields below.
/// This lock should not be taken during Open()/Read()/Close().
/// If RequestContext::lock_ and this lock need to be held simultaneously,
@@ -442,40 +378,25 @@ class ScanRange : public RequestRange {
/// Number of bytes read so far for this scan range
int bytes_read_;
- /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are
- /// initially populated when the client calls AllocateBuffersForRange() and
- /// and are used to read scanned data into. Buffers are taken from this vector for
- /// every read and added back, if needed, when the client calls ReturnBuffer().
- std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_;
-
- /// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
- int64_t unused_iomgr_buffer_bytes_ = 0;
-
- /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to
- /// infer how many bytes of buffers need to be held onto to read the rest of the scan
- /// range.
- int64_t iomgr_buffer_bytes_returned_ = 0;
+ /// Status for this range. This is non-ok if is_cancelled_ is true.
+ /// Note: an individual range can fail without the RequestContext being
+ /// cancelled. This allows us to skip individual ranges.
+ Status status_;
/// If true, the last buffer for this scan range has been queued.
- /// If this is true and 'ready_buffers_' is empty, then no more buffers will be
- /// returned to the caller by this scan range.
bool eosr_queued_ = false;
- /// If true, this scan range is not scheduled because a buffer is not available for
- /// the next I/O in the range. This can happen when the scan range is initially created
- /// or if the buffers added to the range have all been filled with data an not yet
- /// returned.
- bool blocked_on_buffer_ = false;
+ /// If true, the last buffer for this scan range has been returned.
+ bool eosr_returned_ = false;
- /// IO buffers that are queued for this scan range. When Cancel() is called
- /// this is drained by the cancelling thread. I.e. this is always empty if
- /// 'cancel_status_' is not OK.
- std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
+ /// If true, this scan range has been removed from the reader's in_flight_ranges
+ /// queue because the ready_buffers_ queue is full.
+ bool blocked_on_queue_ = false;
- /// Condition variable for threads in GetNext() that are waiting for the next buffer.
- /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is
- /// cancelled.
+ /// IO buffers that are queued for this scan range.
+ /// Condition variable for GetNext
ConditionVariable buffer_ready_cv_;
+ std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
/// Lock that should be taken during hdfs calls. Only one thread (the disk reading
/// thread) calls into hdfs at a time so this lock does not have performance impact.
@@ -485,16 +406,11 @@ class ScanRange : public RequestRange {
/// If this lock and lock_ need to be taken, lock_ must be taken first.
boost::mutex hdfs_lock_;
- /// If non-OK, this scan range has been cancelled. This status is the reason for
- /// cancellation - CANCELLED if cancelled without error, or another status if an
- /// error caused cancellation. Note that a range can be cancelled without cancelling
- /// the owning context. This means that ranges can be cancelled or hit errors without
- /// aborting all scan ranges.
- //
- /// Writers must hold both 'lock_' and 'hdfs_lock_'. Readers must hold either 'lock_'
- /// or 'hdfs_lock_'. This prevents the range from being cancelled while any thread
- /// is inside a critical section.
- Status cancel_status_;
+ /// If true, this scan range has been cancelled.
+ bool is_cancelled_ = false;
+
+ /// Last modified time of the file associated with the scan range
+ int64_t mtime_;
};
/// Used to specify data to be written to a file and offset.
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 4f7c38b..bd89846 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -42,27 +42,35 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
// any time and only one thread will remove from the queue. This is to guarantee
// that buffers are queued and read in file order.
-bool ScanRange::EnqueueReadyBuffer(
+bool ScanRange::EnqueueBuffer(
const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
- DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
+ DCHECK(!eosr_returned_);
DCHECK(!eosr_queued_);
- if (!cancel_status_.ok()) {
- // This range has been cancelled, no need to enqueue the buffer.
- CleanUpBuffer(scan_range_lock, move(buffer));
+ if (is_cancelled_) {
+ // Return the buffer, this range has been cancelled
+ if (buffer->buffer_ != nullptr) {
+ io_mgr_->num_buffers_in_readers_.Add(1);
+ reader_->num_buffers_in_reader_.Add(1);
+ }
+ reader_->num_used_buffers_.Add(-1);
+ io_mgr_->ReturnBuffer(move(buffer));
return false;
}
- // Clean up any surplus buffers. E.g. we may have allocated too many if the file was
- // shorter than expected.
- if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock);
+ reader_->num_ready_buffers_.Add(1);
eosr_queued_ = buffer->eosr();
ready_buffers_.emplace_back(move(buffer));
+
+ DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
+ blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
}
+
buffer_ready_cv_.NotifyOne();
- return true;
+
+ return blocked_on_queue_;
}
Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
@@ -70,225 +78,123 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
bool eosr;
{
unique_lock<mutex> scan_range_lock(lock_);
+ if (eosr_returned_) return Status::OK();
DCHECK(Validate()) << DebugString();
- // No more buffers to return - return the cancel status or OK if not cancelled.
- if (all_buffers_returned(scan_range_lock)) return cancel_status_;
- while (ready_buffers_.empty() && cancel_status_.ok()) {
+ while (ready_buffers_.empty() && !is_cancelled_) {
buffer_ready_cv_.Wait(scan_range_lock);
}
- /// Propagate cancellation to the client if it happened while we were waiting.
- RETURN_IF_ERROR(cancel_status_);
+
+ if (is_cancelled_) {
+ DCHECK(!status_.ok());
+ return status_;
+ }
// Remove the first ready buffer from the queue and return it
DCHECK(!ready_buffers_.empty());
+ DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
*buffer = move(ready_buffers_.front());
ready_buffers_.pop_front();
+ eosr_returned_ = (*buffer)->eosr();
eosr = (*buffer)->eosr();
- DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString();
- }
-
- // Update tracking counters. The buffer has now moved from the IoMgr to the caller.
- if (eosr) reader_->RemoveActiveScanRange(this);
- num_buffers_in_reader_.Add(1);
- return Status::OK();
-}
-
-void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
- vector<unique_ptr<BufferDescriptor>> buffers;
- buffers.emplace_back(move(buffer_desc));
- AddUnusedBuffers(move(buffers), true);
-}
-
-void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
- bool returned) {
- DCHECK_GT(buffers.size(), 0);
- /// Keep track of whether the range was unblocked in this function. If so, we need
- /// to schedule it so it resumes progress.
- bool unblocked = false;
- {
- unique_lock<mutex> scan_range_lock(lock_);
- if (returned) {
- // Buffers were in reader but now aren't.
- num_buffers_in_reader_.Add(-buffers.size());
- }
-
- for (unique_ptr<BufferDescriptor>& buffer : buffers) {
- // We should not hold onto the buffers in the following cases:
- // 1. the scan range is using external buffers, e.g. cached buffers.
- // 2. the scan range is cancelled
- // 3. the scan range already hit eosr
- // 4. we already have enough buffers to read the remainder of the scan range.
- if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
- || !cancel_status_.ok()
- || eosr_queued_
- || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) {
- CleanUpBuffer(scan_range_lock, move(buffer));
- } else {
- unused_iomgr_buffer_bytes_ += buffer->buffer_len();
- unused_iomgr_buffers_.emplace_back(move(buffer));
- if (blocked_on_buffer_) {
- blocked_on_buffer_ = false;
- unblocked = true;
- }
- }
- }
- }
- // Must drop the ScanRange lock before acquiring the RequestContext lock.
- if (unblocked) {
- unique_lock<mutex> reader_lock(reader_->lock_);
- // Reader may have been cancelled after we dropped 'scan_range_lock' above.
- if (reader_->state_ == RequestContext::Cancelled) {
- DCHECK(!cancel_status_.ok());
- } else {
- reader_->ScheduleScanRange(reader_lock, this);
- }
}
-}
-
-unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
- const unique_lock<mutex>& scan_range_lock) {
- DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
- if (unused_iomgr_buffers_.empty()) return nullptr;
- unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back());
- unused_iomgr_buffers_.pop_back();
- unused_iomgr_buffer_bytes_ -= result->buffer_len();
- return result;
-}
-unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
- unique_lock<mutex> lock(lock_);
- unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
- if (buffer_desc == nullptr) {
- blocked_on_buffer_ = true;
- } else {
- iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
+ // Update tracking counters. The buffer has now moved from the IoMgr to the
+ // caller.
+ io_mgr_->num_buffers_in_readers_.Add(1);
+ reader_->num_buffers_in_reader_.Add(1);
+ reader_->num_ready_buffers_.Add(-1);
+ reader_->num_used_buffers_.Add(-1);
+ if (eosr) reader_->num_finished_ranges_.Add(1);
+
+ Status status = (*buffer)->status_;
+ if (!status.ok()) {
+ io_mgr_->ReturnBuffer(move(*buffer));
+ return status;
}
- return buffer_desc;
-}
-void ScanRange::SetBlockedOnBuffer() {
- unique_lock<mutex> lock(lock_);
- blocked_on_buffer_ = true;
-}
+ unique_lock<mutex> reader_lock(reader_->lock_);
-void ScanRange::CleanUpBuffer(
- const boost::unique_lock<boost::mutex>& scan_range_lock,
- unique_ptr<BufferDescriptor> buffer_desc) {
- DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
- DCHECK(buffer_desc != nullptr);
- DCHECK_EQ(this, buffer_desc->scan_range_);
- buffer_desc->reader_->FreeBuffer(buffer_desc.get());
-
- if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) {
- // Close the scan range if there are no more buffers in the reader and no more buffers
- // will be returned to readers in future. Close() is idempotent so it is ok to call
- // multiple times during cleanup so long as the range is actually finished.
- Close();
+ DCHECK(reader_->Validate()) << endl << reader_->DebugString();
+ if (reader_->state_ == RequestContext::Cancelled) {
+ reader_->blocked_ranges_.Remove(this);
+ Cancel(reader_->status_);
+ io_mgr_->ReturnBuffer(move(*buffer));
+ return status_;
}
-}
-
-void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) {
- unique_lock<mutex> lock(lock_);
- for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer));
-}
-void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) {
- while (!unused_iomgr_buffers_.empty()) {
- CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock));
+ {
+ // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
+ // may have been called after we released 'lock_' above so we need to re-check
+ // whether the queue is full.
+ unique_lock<mutex> scan_range_lock(lock_);
+ if (blocked_on_queue_
+ && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
+ && !eosr_queued_) {
+ blocked_on_queue_ = false;
+ // This scan range was blocked and is no longer, add it to the reader
+ // queue again.
+ reader_->blocked_ranges_.Remove(this);
+ reader_->ScheduleScanRange(this);
+ }
}
+ return Status::OK();
}
void ScanRange::Cancel(const Status& status) {
// Cancelling a range that was never started, ignore.
if (io_mgr_ == nullptr) return;
- CancelInternal(status);
- reader_->RemoveActiveScanRange(this);
-}
-void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
- const Status& status) {
- DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
- CancelInternal(status);
- reader_->RemoveActiveScanRangeLocked(reader_lock, this);
-}
-
-void ScanRange::CancelInternal(const Status& status) {
- DCHECK(io_mgr_ != nullptr);
DCHECK(!status.ok());
{
- // Grab both locks to make sure that we don't change 'cancel_status_' while other
- // threads are in critical sections.
+ // Grab both locks to make sure that all working threads see is_cancelled_.
unique_lock<mutex> scan_range_lock(lock_);
- {
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- DCHECK(Validate()) << DebugString();
- // If already cancelled, preserve the original reason for cancellation. The first
- // thread to set 'cancel_status_' does the cleanup below.
- RETURN_VOID_IF_ERROR(cancel_status_);
- cancel_status_ = status;
- }
-
- /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
- /// from seeing inconsistent state.
- while (!ready_buffers_.empty()) {
- CleanUpBuffer(scan_range_lock, move(ready_buffers_.front()));
- ready_buffers_.pop_front();
- }
-
- /// Clean up buffers that we don't need any more because we won't read any more data.
- CleanUpUnusedBuffers(scan_range_lock);
+ unique_lock<mutex> hdfs_lock(hdfs_lock_);
+ DCHECK(Validate()) << DebugString();
+ if (is_cancelled_) return;
+ is_cancelled_ = true;
+ status_ = status;
}
buffer_ready_cv_.NotifyAll();
+ CleanupQueuedBuffers();
// For cached buffers, we can't close the range until the cached buffer is returned.
- // Close() is called from ScanRange::CleanUpBufferLocked().
+ // Close() is called from DiskIoMgr::ReturnBuffer().
if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
}
+void ScanRange::CleanupQueuedBuffers() {
+ DCHECK(is_cancelled_);
+ io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
+ reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
+ reader_->num_used_buffers_.Add(-ready_buffers_.size());
+ reader_->num_ready_buffers_.Add(-ready_buffers_.size());
+
+ while (!ready_buffers_.empty()) {
+ io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
+ ready_buffers_.pop_front();
+ }
+}
+
string ScanRange::DebugString() const {
stringstream ss;
ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
<< " len=" << len_ << " bytes_read=" << bytes_read_
- << " cancel_status=" << cancel_status_.GetDetail()
<< " buffer_queue=" << ready_buffers_.size()
- << " num_buffers_in_readers=" << num_buffers_in_reader_.Load()
- << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size()
- << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_
- << " blocked_on_buffer=" << blocked_on_buffer_
<< " hdfs_file=" << exclusive_hdfs_fh_;
return ss.str();
}
bool ScanRange::Validate() {
if (bytes_read_ > len_) {
- LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
+ LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
<< " bytes_read_=" << bytes_read_ << " len_=" << len_;
return false;
}
- if (!cancel_status_.ok() && !ready_buffers_.empty()) {
- LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString();
- return false;
- }
- int64_t unused_iomgr_buffer_bytes = 0;
- for (auto& buffer : unused_iomgr_buffers_)
- unused_iomgr_buffer_bytes += buffer->buffer_len();
- if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) {
- LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: "
- << unused_iomgr_buffer_bytes_
- << " vs. expected: " << unused_iomgr_buffer_bytes;
- return false;
- }
- bool is_finished = !cancel_status_.ok() || eosr_queued_;
- if (is_finished && !unused_iomgr_buffers_.empty()) {
- LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size()
- << " bytes: " << unused_iomgr_buffer_bytes_
- << " cancel_status: " << cancel_status_.GetDetail()
- << " eosr_queued: " << eosr_queued_;
- return false;
- }
- if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) {
- LOG(ERROR) << "Blocked despite having buffers: " << DebugString();
+ if (eosr_returned_ && !eosr_queued_) {
+ LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range"
+ << " eosr_returned_=" << eosr_returned_
+ << " eosr_queued_=" << eosr_queued_;
return false;
}
return true;
@@ -297,14 +203,13 @@ bool ScanRange::Validate() {
ScanRange::ScanRange()
: RequestRange(RequestType::READ),
num_remote_bytes_(0),
- external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
+ external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
+ mtime_(-1) {}
ScanRange::~ScanRange() {
DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
<< "Cached buffer was not released.";
- DCHECK_EQ(0, ready_buffers_.size());
- DCHECK_EQ(0, num_buffers_in_reader_.Load());
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
@@ -340,22 +245,24 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
DCHECK(exclusive_hdfs_fh_ == nullptr);
DCHECK(local_file_ == nullptr);
+ // Reader must provide MemTracker or a buffer.
+ DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
+ || reader->mem_tracker_ != nullptr);
io_mgr_ = io_mgr;
reader_ = reader;
local_file_ = nullptr;
exclusive_hdfs_fh_ = nullptr;
bytes_read_ = 0;
- unused_iomgr_buffer_bytes_ = 0;
- iomgr_buffer_bytes_returned_ = 0;
- cancel_status_ = Status::OK();
- eosr_queued_ = false;
- blocked_on_buffer_ = false;
+ is_cancelled_ = false;
+ eosr_queued_= false;
+ eosr_returned_= false;
+ blocked_on_queue_ = false;
DCHECK(Validate()) << DebugString();
}
Status ScanRange::Open(bool use_file_handle_cache) {
unique_lock<mutex> hdfs_lock(hdfs_lock_);
- RETURN_IF_ERROR(cancel_status_);
+ if (is_cancelled_) return Status::CANCELLED;
if (fs_ != nullptr) {
if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
@@ -395,7 +302,9 @@ Status ScanRange::Open(bool use_file_handle_cache) {
"for file: $1: $2", offset_, file_, GetStrErrMsg()));
}
}
- ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
+ if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
+ ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
+ }
return Status::OK();
}
@@ -447,7 +356,9 @@ void ScanRange::Close() {
local_file_ = nullptr;
closed_file = true;
}
- if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
+ if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
+ ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
+ }
}
int64_t ScanRange::MaxReadChunkSize() const {
@@ -475,7 +386,7 @@ int64_t ScanRange::MaxReadChunkSize() const {
Status ScanRange::Read(
uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
unique_lock<mutex> hdfs_lock(hdfs_lock_);
- RETURN_IF_ERROR(cancel_status_);
+ if (is_cancelled_) return Status::CANCELLED;
*eosr = false;
*bytes_read = 0;
@@ -622,7 +533,7 @@ Status ScanRange::ReadFromCache(
{
unique_lock<mutex> hdfs_lock(hdfs_lock_);
- RETURN_IF_ERROR(cancel_status_);
+ if (is_cancelled_) return Status::CANCELLED;
DCHECK(exclusive_hdfs_fh_ != nullptr);
DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
@@ -657,16 +568,20 @@ Status ScanRange::ReadFromCache(
}
// Create a single buffer desc for the entire scan range and enqueue that.
- // The memory is owned by the HDFS java client, not the Impala backend.
+ // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
+ // not the Impala backend.
unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
- io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0));
+ io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
desc->len_ = bytes_read;
desc->scan_range_offset_ = 0;
desc->eosr_ = true;
bytes_read_ = bytes_read;
- EnqueueReadyBuffer(reader_lock, move(desc));
- COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
+ EnqueueBuffer(reader_lock, move(desc));
+ if (reader_->bytes_read_counter_ != nullptr) {
+ COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
+ }
*read_succeeded = true;
+ reader_->num_used_buffers_.Add(1);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 8d28f8f..10a3424 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -107,6 +107,7 @@ class MemTracker {
/// destruction to prevent other threads from getting a reference to the MemTracker
/// via its parent. Only used to deregister the query-level MemTracker from the
/// global hierarchy.
+ /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be deleted.
void CloseAndUnregisterFromParent();
/// Include counters from a ReservationTracker in logs and other diagnostics.
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 0d6c6f0..770eaba 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -49,7 +49,7 @@ Status TestEnv::Init() {
exec_env_.reset(new ExecEnv);
// Populate the ExecEnv state that the backend tests need.
exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
- RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
+ RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker()));
exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
if (have_tmp_file_mgr_args_) {
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 9161b63..3091c58 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -25,7 +25,6 @@
#include <gtest/gtest.h>
#include "common/init.h"
-#include "runtime/io/request-context.h"
#include "runtime/test-env.h"
#include "runtime/tmp-file-mgr-internal.h"
#include "runtime/tmp-file-mgr.h"
@@ -135,7 +134,7 @@ class TmpFileMgrTest : public ::testing::Test {
/// Helper to cancel the FileGroup RequestContext.
static void CancelIoContext(TmpFileMgr::FileGroup* group) {
- group->io_ctx_->Cancel();
+ group->io_mgr_->CancelContext(group->io_ctx_.get());
}
/// Helper to get the # of bytes allocated by the group. Validates that the sum across
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index e0c58d4..3807670 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -243,7 +243,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
next_allocation_index_(0),
free_ranges_(64) {
DCHECK(tmp_file_mgr != nullptr);
- io_ctx_ = io_mgr_->RegisterContext();
+ io_ctx_ = io_mgr_->RegisterContext(nullptr);
}
TmpFileMgr::FileGroup::~FileGroup() {
@@ -396,10 +396,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
BufferOpts::ReadInto(buffer.data(), buffer.len()));
read_counter_->Add(1);
bytes_read_counter_->Add(buffer.len());
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr_->StartScanRange(
- io_ctx_.get(), handle->read_range_, &needs_buffers));
- DCHECK(!needs_buffers) << "Already provided a buffer";
+ RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true));
return Status::OK();
}
@@ -429,7 +426,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf
}
exit:
// Always return the buffer before exiting to avoid leaking it.
- if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer));
+ if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer));
handle->read_range_ = nullptr;
return status;
}
@@ -525,20 +522,11 @@ TmpFileMgr::WriteHandle::WriteHandle(
is_cancelled_(false),
write_in_flight_(false) {}
-TmpFileMgr::WriteHandle::~WriteHandle() {
- DCHECK(!write_in_flight_);
- DCHECK(read_range_ == nullptr);
-}
-
string TmpFileMgr::WriteHandle::TmpFilePath() const {
if (file_ == nullptr) return "";
return file_->path();
}
-int64_t TmpFileMgr::WriteHandle::len() const {
- return write_range_->len();
-}
-
Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx,
File* file, int64_t offset, MemRange buffer,
WriteRange::WriteDoneCallback callback) {
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 55901f4..95072ae 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,6 +28,7 @@
#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/io/request-ranges.h"
#include "util/collection-metrics.h"
#include "util/condition-variable.h"
#include "util/mem-range.h"
@@ -36,12 +37,6 @@
#include "util/spinlock.h"
namespace impala {
-namespace io {
- class DiskIoMgr;
- class RequestContext;
- class ScanRange;
- class WriteRange;
-}
/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
@@ -89,7 +84,6 @@ class TmpFileMgr {
/// Needs to be public for TmpFileMgrTest.
typedef int DeviceId;
- /// Same typedef as io::WriteRange::WriteDoneCallback.
typedef std::function<void(const Status&)> WriteDoneCallback;
/// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -283,7 +277,10 @@ class TmpFileMgr {
public:
/// The write must be destroyed by passing it to FileGroup - destroying it before
/// the write completes is an error.
- ~WriteHandle();
+ ~WriteHandle() {
+ DCHECK(!write_in_flight_);
+ DCHECK(read_range_ == nullptr);
+ }
/// Cancel any in-flight read synchronously.
void CancelRead();
@@ -293,7 +290,7 @@ class TmpFileMgr {
std::string TmpFilePath() const;
/// The length of the write range in bytes.
- int64_t len() const;
+ int64_t len() const { return write_range_->len(); }
std::string DebugString();
@@ -308,7 +305,7 @@ class TmpFileMgr {
/// failure and 'is_cancelled_' is set to true on failure.
Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
int64_t offset, MemRange buffer,
- WriteDoneCallback callback) WARN_UNUSED_RESULT;
+ io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
/// Retry the write after the initial write failed with an error, instead writing to
/// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 6f70727..5f8d443 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -258,17 +258,6 @@ TEST(BitUtil, Log2) {
EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64);
}
-TEST(BitUtil, RoundToPowerOfTwo) {
- EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9));
- EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15));
- EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16));
- EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17));
- EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9));
- EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15));
- EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16));
- EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17));
-}
-
TEST(BitUtil, RoundUpToPowerOf2) {
EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8);
EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8);
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 8a65509..5c9a29b 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -98,12 +98,6 @@ class BitUtil {
return v;
}
- /// Returns the largest power of two <= v.
- static inline int64_t RoundDownToPowerOfTwo(int64_t v) {
- int64_t v_rounded_up = RoundUpToPowerOfTwo(v);
- return v_rounded_up == v ? v : v_rounded_up / 2;
- }
-
/// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is
/// a power of two
static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) {
@@ -111,7 +105,7 @@ class BitUtil {
return (value + (factor - 1)) & ~(factor - 1);
}
- static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) {
+ static inline int RoundDownToPowerOf2(int value, int factor) {
DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
return value & ~(factor - 1);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 815e4af..c2a2644 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -46,6 +46,12 @@ const char* ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES =
"impala-server.hash-table.total-bytes";
const char* ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES =
"impala-server.io-mgr.num-open-files";
+const char* ImpaladMetricKeys::IO_MGR_NUM_BUFFERS =
+ "impala-server.io-mgr.num-buffers";
+const char* ImpaladMetricKeys::IO_MGR_TOTAL_BYTES =
+ "impala-server.io-mgr.total-bytes";
+const char* ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS =
+ "impala-server.io-mgr.num-unused-buffers";
const char* ImpaladMetricKeys::IO_MGR_BYTES_READ =
"impala-server.io-mgr.bytes-read";
const char* ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ =
@@ -205,8 +211,11 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
// Initialize IO mgr metrics
- IO_MGR_NUM_OPEN_FILES = m->AddGauge(
- ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
+ IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
+ IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
+ IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
+ IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
+ ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/impalad-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 7de7aa8..b49caf9 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -67,6 +67,15 @@ class ImpaladMetricKeys {
/// Number of files currently opened by the io mgr
static const char* IO_MGR_NUM_OPEN_FILES;
+ /// Number of IO buffers allocated by the io mgr
+ static const char* IO_MGR_NUM_BUFFERS;
+
+ /// Number of bytes used by IO buffers (used and unused).
+ static const char* IO_MGR_TOTAL_BYTES;
+
+ /// Number of IO buffers that are currently unused (and can be GC'ed)
+ static const char* IO_MGR_NUM_UNUSED_BUFFERS;
+
/// Total number of bytes read by the io mgr
static const char* IO_MGR_BYTES_READ;
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 1ab05a0..c5df1cd 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -239,9 +239,6 @@ struct THdfsScanNode {
// The byte offset of the slot for Parquet metadata if Parquet count star optimization
// is enabled.
10: optional i32 parquet_count_star_slot_offset
-
- // The ideal memory reservation in bytes to process an input split.
- 11: optional i64 ideal_scan_range_reservation
}
struct TDataSourceScanNode {
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index aae3863..4f0a0e1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -163,25 +163,6 @@ public class SlotDescriptor {
}
/**
- * Checks if this descriptor describes an array "pos" pseudo-column.
- *
- * Note: checking whether the column is null distinguishes between top-level columns
- * and nested types. This check more specifically looks just for a reference to the
- * "pos" field of an array type.
- */
- public boolean isArrayPosRef() {
- if (parent_ == null) return false;
- Type parentType = parent_.getType();
- if (parentType instanceof CollectionStructType) {
- if (((CollectionStructType)parentType).isArrayStruct() &&
- label_.equals(Path.ARRAY_POS_FIELD_NAME)) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Assembles the absolute materialized path to this slot starting from the schema
* root. The materialized path points to the first non-struct schema element along the
* path starting from the parent's tuple path to this slot's path.