You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 04:18:15 UTC
[10/11] impala git commit: IMPALA-4835: Part 2: Allocate scan range
buffers upfront
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 031b976..b124702 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -36,30 +36,6 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
DCHECK_GE(buffer_len, 0);
}
-Status RequestContext::AllocBuffer(ScanRange* range, int64_t buffer_size,
- unique_ptr<BufferDescriptor>* buffer_desc) {
- DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
- << static_cast<int>(range->external_buffer_tag_);
- DCHECK_LE(buffer_size, parent_->max_buffer_size_);
- DCHECK_GT(buffer_size, 0);
- buffer_size = BitUtil::RoundUpToPowerOfTwo(
- max(parent_->min_buffer_size_, min(parent_->max_buffer_size_, buffer_size)));
-
- DCHECK(mem_tracker_ != nullptr);
- if (!mem_tracker_->TryConsume(buffer_size)) {
- return mem_tracker_->MemLimitExceeded(nullptr, "disk I/O buffer", buffer_size);
- }
-
- uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
- if (buffer == nullptr) {
- mem_tracker_->Release(buffer_size);
- return Status(TErrorCode::INTERNAL_ERROR,
- Substitute("Could not malloc buffer of $0 bytes"));
- }
- buffer_desc->reset(new BufferDescriptor(parent_, this, range, buffer, buffer_size));
- return Status::OK();
-}
-
void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
DCHECK(buffer->buffer_ != nullptr);
if (!buffer->is_cached() && !buffer->is_client_buffer()) {
@@ -108,40 +84,26 @@ void RequestContext::Cancel() {
// The reader will be put into a cancelled state until call cleanup is complete.
state_ = RequestContext::Cancelled;
- // Cancel all scan ranges for this reader. Each range could be one one of
- // four queues.
- for (int i = 0; i < disk_states_.size(); ++i) {
- PerDiskState& state = disk_states_[i];
- RequestRange* range = nullptr;
- while ((range = state.in_flight_ranges()->Dequeue()) != nullptr) {
- if (range->request_type() == RequestType::READ) {
- static_cast<ScanRange*>(range)->Cancel(Status::CANCELLED);
- } else {
- DCHECK(range->request_type() == RequestType::WRITE);
+ // Clear out all request ranges from queues for this reader. Cancel the scan
+ // ranges and invoke the write range callbacks to propagate the cancellation.
+ for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED);
+ active_scan_ranges_.clear();
+ for (PerDiskState& disk_state : disk_states_) {
+ RequestRange* range;
+ while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) {
+ if (range->request_type() == RequestType::WRITE) {
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
}
}
-
- ScanRange* scan_range;
- while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != nullptr) {
- scan_range->Cancel(Status::CANCELLED);
- }
+ while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr);
WriteRange* write_range;
- while ((write_range = state.unstarted_write_ranges()->Dequeue()) != nullptr) {
+ while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) {
write_callbacks.push_back(write_range->callback_);
}
}
-
- ScanRange* range = nullptr;
- while ((range = ready_to_start_ranges_.Dequeue()) != nullptr) {
- range->Cancel(Status::CANCELLED);
- }
- while ((range = blocked_ranges_.Dequeue()) != nullptr) {
- range->Cancel(Status::CANCELLED);
- }
- while ((range = cached_ranges_.Dequeue()) != nullptr) {
- range->Cancel(Status::CANCELLED);
- }
+ // Clear out the lists of scan ranges.
+ while (ready_to_start_ranges_.Dequeue() != nullptr);
+ while (cached_ranges_.Dequeue() != nullptr);
// Ensure that the reader is scheduled on all disks (it may already be scheduled on
// some). The disk threads will notice that the context is cancelled and do any
@@ -170,9 +132,8 @@ void RequestContext::CancelAndMarkInactive() {
// Wait until the ranges finish up.
while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
- // Validate that no buffers were leaked from this context.
- DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
- DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+ // Validate that no ranges are active.
+ DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString();
// Validate that no threads are active and the context is not queued.
for (const PerDiskState& disk_state : disk_states_) {
@@ -185,42 +146,58 @@ void RequestContext::CancelAndMarkInactive() {
state_ = Inactive;
}
-void RequestContext::AddRequestRange(const unique_lock<mutex>& lock,
- RequestRange* range, bool schedule_immediately) {
+void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
+ RequestRange* range, ScheduleMode schedule_mode) {
DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- PerDiskState& state = disk_states_[range->disk_id()];
- if (state.done()) {
- DCHECK_EQ(state.num_remaining_ranges(), 0);
- state.set_done(false);
+ DCHECK_EQ(state_, Active) << DebugString();
+ PerDiskState* disk_state = &disk_states_[range->disk_id()];
+ if (disk_state->done()) {
+ DCHECK_EQ(disk_state->num_remaining_ranges(), 0);
+ disk_state->set_done(false);
++num_disks_with_ranges_;
}
-
- bool schedule_context;
if (range->request_type() == RequestType::READ) {
ScanRange* scan_range = static_cast<ScanRange*>(range);
- if (schedule_immediately) {
+ if (schedule_mode == ScheduleMode::IMMEDIATELY) {
ScheduleScanRange(lock, scan_range);
- } else {
- state.unstarted_scan_ranges()->Enqueue(scan_range);
+ } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) {
+ disk_state->unstarted_scan_ranges()->Enqueue(scan_range);
num_unstarted_scan_ranges_.Add(1);
+ // If there's no 'next_scan_range_to_start', schedule this RequestContext so that
+ // one of the 'unstarted_scan_ranges' will become the 'next_scan_range_to_start'.
+ if (disk_state->next_scan_range_to_start() == nullptr) {
+ disk_state->ScheduleContext(lock, this, range->disk_id());
+ }
}
- // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
- // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
- // invoked.
- schedule_context = state.next_scan_range_to_start() == NULL;
} else {
DCHECK(range->request_type() == RequestType::WRITE);
- DCHECK(!schedule_immediately);
+ DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode);
WriteRange* write_range = static_cast<WriteRange*>(range);
- state.unstarted_write_ranges()->Enqueue(write_range);
+ disk_state->unstarted_write_ranges()->Enqueue(write_range);
- // ScheduleContext() has no effect if the context is already scheduled,
- // so this is safe.
- schedule_context = true;
+ // Ensure that the context is scheduled so that the write range gets picked up.
+ // ScheduleContext() has no effect if already scheduled, so this is safe to do always.
+ disk_state->ScheduleContext(lock, this, range->disk_id());
}
+ ++disk_state->num_remaining_ranges();
+}
+
+void RequestContext::AddActiveScanRangeLocked(
+ const unique_lock<mutex>& lock, ScanRange* range) {
+ DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ DCHECK(state_ == Active);
+ active_scan_ranges_.insert(range);
+}
+
+void RequestContext::RemoveActiveScanRange(ScanRange* range) {
+ unique_lock<mutex> lock(lock_);
+ RemoveActiveScanRangeLocked(lock, range);
+}
- if (schedule_context) state.ScheduleContext(lock, this, range->disk_id());
- ++state.num_remaining_ranges();
+void RequestContext::RemoveActiveScanRangeLocked(
+ const unique_lock<mutex>& lock, ScanRange* range) {
+ DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ active_scan_ranges_.erase(range);
}
RequestContext::RequestContext(
@@ -235,12 +212,9 @@ string RequestContext::DebugString() const {
if (state_ == RequestContext::Cancelled) ss << "Cancelled";
if (state_ == RequestContext::Active) ss << "Active";
if (state_ != RequestContext::Inactive) {
- ss << " #ready_buffers=" << num_ready_buffers_.Load()
- << " #used_buffers=" << num_used_buffers_.Load()
- << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
- << " #finished_scan_ranges=" << num_finished_ranges_.Load()
- << " #disk_with_ranges=" << num_disks_with_ranges_
- << " #disks=" << num_disks_with_ranges_;
+ ss << " #disk_with_ranges=" << num_disks_with_ranges_
+ << " #disks=" << num_disks_with_ranges_
+ << " #active scan ranges=" << active_scan_ranges_.size();
for (int i = 0; i < disk_states_.size(); ++i) {
ss << endl << " " << i << ": "
<< "is_on_queue=" << disk_states_[i].is_on_queue()
@@ -263,16 +237,6 @@ bool RequestContext::Validate() const {
return false;
}
- if (num_used_buffers_.Load() < 0) {
- LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
- return false;
- }
-
- if (num_ready_buffers_.Load() < 0) {
- LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
- return false;
- }
-
int total_unstarted_ranges = 0;
for (int i = 0; i < disk_states_.size(); ++i) {
const PerDiskState& state = disk_states_[i];
@@ -350,8 +314,8 @@ bool RequestContext::Validate() const {
LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
return false;
}
- if (!blocked_ranges_.empty()) {
- LOG(WARNING) << "Reader cancelled but has blocked ranges.";
+ if (!active_scan_ranges_.empty()) {
+ LOG(WARNING) << "Reader cancelled but has active ranges.";
return false;
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 737a16c..3aea2bc 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -23,6 +23,11 @@
namespace impala {
namespace io {
+
+// Mode argument for AddRangeToDisk().
+enum class ScheduleMode {
+ IMMEDIATELY, UPON_GETNEXT, BY_CALLER
+};
/// A request context is used to group together I/O requests belonging to a client of the
/// I/O manager for management and scheduling.
///
@@ -32,53 +37,59 @@ namespace io {
/// maintains state across all disks as well as per disk state.
/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
/// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
+/// A scan range for the reader is on one of six states:
+/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued
/// and nothing has been read from it.
-/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started.
-/// As soon as the reader picks it up, it will move to the in_flight_ranges
+/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be started.
+/// As soon as the reader picks it up, it will move to the 'in_flight_ranges_'
/// queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
+/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and will
/// be read from the next time a disk thread picks it up in GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-/// anymore. We need the caller to pull a buffer off which will put this in
-/// the in_flight_ranges queue. These ranges are in the RequestContext's
-/// blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, the
+/// 4) The ScanRange is blocked waiting for buffers because it does not have any unused
+/// buffers to read data into. It is unblocked when a client adds new buffers via
+/// AllocateBuffersForRange() or returns existing buffers via ReturnBuffer().
+/// ScanRanges in this state are identified by 'blocked_on_buffer_' == true.
+/// 5) ScanRange is cached and in the 'cached_ranges_' queue.
+/// 6) Inactive - either all the data for the range was returned or the range was
+/// cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ != OK.
+///
+/// If the scan range is read and does not get blocked waiting for buffers, the
/// transitions are: 1 -> 2 -> 3.
/// If the scan range does get blocked, the transitions are
/// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in cached_ranges_.
+///
+/// In the case of a cached scan range, the range is immediately put in 'cached_ranges_'.
/// When the caller asks for the next range to process, we first pull ranges from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
+/// the 'cache_ranges_' queue. If the range was cached, the range is removed and
/// done (ranges are either entirely cached or not at all). If the cached read attempt
/// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread
-/// that picks it up in GetNextRequestRange().
-//
+///
+/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that they can be
+/// cancelled when the RequestContext is cancelled. Scan ranges are removed from
+/// 'active_scan_ranges_' during their transition to state 6.
+///
+/// A write range for a context may be in one of two queues:
+/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not processed.
+/// 2) 'in_flight_ranges_': The write range is ready to be processed by the next disk
+/// thread that picks it up in GetNextRequestRange().
+///
/// AddWriteRange() adds WriteRanges for a disk.
/// It is the responsibility of the client to pin the data to be written via a WriteRange
/// in memory. After a WriteRange has been written, a callback is invoked to inform the
/// client that the write has completed.
-//
+///
/// An important assumption is that write does not exceed the maximum read size and that
/// the entire range is written when the write request is handled. (In other words, writes
/// are not broken up.)
-//
+///
/// When a RequestContext is processed by a disk thread in GetNextRequestRange(),
/// a write range is always removed from the list of unstarted write ranges and appended
/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
-/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
-/// behind at most one write range.
+/// that is scheduled (by calling GetNextUnstartedRange()) is always followed by a write
+/// (if one exists). And since at most one WriteRange can be present in in_flight_ranges_
+/// at any time (once a write range is returned from GetNetxRequestRange() it is completed
+/// and not re-enqueued), a scan range scheduled via a call to GetNextUnstartedRange() can
+/// be queued up behind at most one write range.
class RequestContext {
public:
~RequestContext() {
@@ -97,7 +108,6 @@ class RequestContext {
return state_ == Cancelled;
}
- int64_t queue_size() const { return num_ready_buffers_.Load(); }
int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
@@ -150,13 +160,6 @@ class RequestContext {
RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
- /// Allocates a buffer to read into with size between
- /// max('buffer_size', 'min_buffer_size_') and 'max_buffer_size_'.
- /// Does not acquire 'lock_'.
- /// TODO: allocate using the buffer pool client associated with this reader.
- Status AllocBuffer(ScanRange* range, int64_t buffer_size,
- std::unique_ptr<BufferDescriptor>* buffer);
-
/// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer
/// memory and release the consumption to the client MemTracker. Otherwise (e.g. a
/// client or HDFS cache buffer), just prepares the descriptor to be destroyed.
@@ -184,11 +187,11 @@ class RequestContext {
/// Adds range to in_flight_ranges, scheduling this reader on the disk threads
/// if necessary.
- /// 'lock_' must be held via 'lock'
+ /// 'lock_' must be held via 'lock'. Only valid to call if this context is active.
void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
DCHECK_EQ(state_, Active);
- DCHECK(range != NULL);
+ DCHECK(range != nullptr);
RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
state.in_flight_ranges()->Enqueue(range);
state.ScheduleContext(lock, this, range->disk_id());
@@ -198,11 +201,34 @@ class RequestContext {
/// and mark the context as inactive, after which it cannot be used.
void CancelAndMarkInactive();
- /// Adds request range to disk queue for this request context. Currently,
- /// schedule_immediately must be false is RequestRange is a write range.
- /// Caller must hold 'lock_' via 'lock'.
- void AddRequestRange(const boost::unique_lock<boost::mutex>& lock,
- RequestRange* range, bool schedule_immediately);
+ /// Adds a request range to the appropriate disk state. 'schedule_mode' controls which
+ /// queue the range is placed in. This RequestContext is scheduled on the disk state
+ /// if required by 'schedule_mode'.
+ ///
+ /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added to the
+ /// 'unstarted_write_ranges_' queue, from which they will be asynchronously moved to the
+ /// 'in_flight_ranges_' queue.
+ ///
+ /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, the range is
+ /// immediately added to the 'in_flight_ranges_' queue where it will be processed
+ /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to the
+ /// 'unstarted_ranges_' queue, from which it can be returned to a client by
+ /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not added to
+ /// any queues. The range will be scheduled later as a separate step, e.g. when it is
+ /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'.
+ void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, RequestRange* range,
+ ScheduleMode schedule_mode);
+
+ /// Adds an active range to 'active_scan_ranges_'
+ void AddActiveScanRangeLocked(
+ const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+
+ /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after eos or
+ /// cancellation. If calling the Locked version, the caller must hold
+ /// 'lock_'. Otherwise the function will acquire 'lock_'.
+ void RemoveActiveScanRange(ScanRange* range);
+ void RemoveActiveScanRangeLocked(
+ const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
/// Validates invariants of reader. Reader lock must be taken beforehand.
bool Validate() const;
@@ -243,13 +269,6 @@ class RequestContext {
/// Total number of bytes from remote reads that were expected to be local.
AtomicInt64 unexpected_remote_bytes_{0};
- /// The number of buffers that have been returned to the reader (via GetNext()) that the
- /// reader has not returned. Only included for debugging and diagnostics.
- AtomicInt32 num_buffers_in_reader_{0};
-
- /// The number of scan ranges that have been completed for this reader.
- AtomicInt32 num_finished_ranges_{0};
-
/// The number of scan ranges that required a remote read, updated at the end of each
/// range scan. Only used for diagnostics.
AtomicInt32 num_remote_ranges_{0};
@@ -264,17 +283,6 @@ class RequestContext {
/// Total number of file handle opens where the file handle was not in the cache
AtomicInt32 cached_file_handles_miss_count_{0};
- /// The number of buffers that are being used for this reader. This is the sum
- /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
- /// to be queued). This includes both IOMgr-allocated buffers and client-provided
- /// buffers.
- AtomicInt32 num_used_buffers_{0};
-
- /// The total number of ready buffers across all ranges. Ready buffers are buffers
- /// that have been read from disk but not retrieved by the caller.
- /// This is the sum of all queued buffers in all ranges for this reader context.
- AtomicInt32 num_ready_buffers_{0};
-
/// All fields below are accessed by multiple threads and the lock needs to be
/// taken before accessing them. Must be acquired before ScanRange::lock_ if both
/// are held simultaneously.
@@ -283,6 +291,17 @@ class RequestContext {
/// Current state of the reader
State state_ = Active;
+ /// Scan ranges that have been added to the IO mgr for this context. Ranges can only
+ /// be added when 'state_' is Active. When this context is cancelled, Cancel() is
+ /// called for all the active ranges. If a client attempts to add a range while
+ /// 'state_' is Cancelled, the range is not added to this list and Status::CANCELLED
+ /// is returned to the client. This ensures that all active ranges are cancelled as a
+ /// result of RequestContext cancellation.
+ /// Ranges can be cancelled or hit eos non-atomically with their removal from this set,
+ /// so eos or cancelled ranges may be temporarily present here. Cancelling these ranges
+ /// a second time or cancelling after eos is safe and has no effect.
+ boost::unordered_set<ScanRange*> active_scan_ranges_;
+
/// The number of disks with scan ranges remaining (always equal to the sum of
/// disks with ranges).
int num_disks_with_ranges_ = 0;
@@ -293,18 +312,15 @@ class RequestContext {
InternalList<ScanRange> cached_ranges_;
/// A list of ranges that should be returned in subsequent calls to
- /// GetNextRange.
+ /// GetNextUnstartedRange().
/// There is a trade-off with when to populate this list. Populating it on
- /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()).
+ /// demand means consumers need to wait (happens in DiskIoMgr::GetNextUnstartedRange()).
/// Populating it preemptively means we make worse scheduling decisions.
/// We currently populate one range per disk.
/// TODO: think about this some more.
InternalList<ScanRange> ready_to_start_ranges_;
ConditionVariable ready_to_start_ranges_cv_; // used with lock_
- /// Ranges that are blocked due to back pressure on outgoing buffers.
- InternalList<ScanRange> blocked_ranges_;
-
/// Condition variable for UnregisterContext() to wait for all disks to complete
ConditionVariable disks_complete_cond_var_;
@@ -429,7 +445,7 @@ class RequestContext {
/// Queue of pending IO requests for this disk in the order that they will be
/// processed. A ScanRange is added to this queue when it is returned in
- /// GetNextRange(), or when it is added with schedule_immediately = true.
+ /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY.
/// A WriteRange is added to this queue from unstarted_write_ranges_ for each
/// invocation of GetNextRequestRange() in WorkLoop().
/// The size of this queue is always less than or equal to num_remaining_ranges.
@@ -458,7 +474,8 @@ class RequestContext {
/// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
/// and writes. (Otherwise, since next_scan_range_to_start is set
/// in GetNextRequestRange() whenever it is null, repeated calls to
- /// GetNextRequestRange() and GetNextRange() may result in only reads being processed)
+ /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being
+ /// processed)
InternalQueue<WriteRange> unstarted_write_ranges_;
};
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index ab0810a..041cb9d 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -26,8 +26,10 @@
#include "common/atomic.h"
#include "common/hdfs.h"
#include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "util/condition-variable.h"
#include "util/internal-queue.h"
+#include "util/mem-range.h"
namespace impala {
class MemTracker;
@@ -202,11 +204,11 @@ class ScanRange : public RequestRange {
/// Resets this scan range object with the scan range description. The scan range
/// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
- /// local filesystem). The scan range must fall within the file bounds (offset >= 0
- /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range
- /// to. If 'expected_local' is true, a warning is generated if the read did not
- /// come from a local disk. 'buffer_opts' specifies buffer management options -
- /// see the DiskIoMgr class comment and the BufferOpts comments for details.
+ /// local filesystem). The scan range must be non-empty and fall within the file bounds
+ /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
+ /// queue to add the range to. If 'expected_local' is true, a warning is generated if
+ /// the read did not come from a local disk. 'buffer_opts' specifies buffer management
+ /// options - see the DiskIoMgr class comment and the BufferOpts comments for details.
/// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
@@ -248,12 +250,11 @@ class ScanRange : public RequestRange {
/// Initialize internal fields
void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
- /// Enqueues a buffer for this range. This does not block.
- /// Returns true if this scan range has hit the queue capacity, false otherwise.
+ /// Enqueues a ready buffer with valid data for this range. This does not block.
/// The caller passes ownership of buffer to the scan range and it is not
/// valid to access buffer after this call. The reader lock must be held by the
- /// caller.
- bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+ /// caller. Returns false if the scan range was cancelled.
+ bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
std::unique_ptr<BufferDescriptor> buffer);
/// Validates the internal state of this range. lock_ must be taken
@@ -304,16 +305,51 @@ class ScanRange : public RequestRange {
Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
bool* read_succeeded) WARN_UNUSED_RESULT;
+ /// Add buffers for the range to read data into and schedule the range if blocked.
+ /// If 'returned' is true, the buffers returned from GetNext() that are being recycled
+ /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added.
+ void AddUnusedBuffers(
+ std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
+
+ /// Remove a buffer from 'unused_iomgr_buffers_' and update
+ /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL.
+ /// 'lock_' must be held by the caller via 'scan_range_lock'.
+ std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
+ const boost::unique_lock<boost::mutex>& scan_range_lock);
+
+ /// Get the next buffer for this scan range for a disk thread to read into. Returns
+ /// the new buffer if successful. If no buffers are available, marks the range
+ /// as blocked and returns nullptr. Called must not hold 'lock_'.
+ std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
+
/// Cleans up a buffer that was not returned to the client.
/// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
- /// This function will acquire 'lock_' and may acquire 'hdfs_lock_'.
- void CleanUpBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
- /// Same as CleanUpBuffer() except the caller must already hold 'lock_' via
- /// 'scan_range_lock'.
- void CleanUpBufferLocked(const boost::unique_lock<boost::mutex>& scan_range_lock,
+ /// The caller must hold 'lock_' via 'scan_range_lock'.
+ /// This function may acquire 'hdfs_lock_'
+ void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
std::unique_ptr<BufferDescriptor> buffer);
+ /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not
+ /// hold 'lock_'.
+ void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers);
+
+ /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan
+ /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
+ void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
+
+ /// Same as Cancel() except reader_->lock must be held by the caller.
+ void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
+ const Status& status);
+
+ /// Same as Cancel() except doesn't remove the scan range from
+ /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(),
+ /// which removes the range itself to avoid invalidating its active_scan_ranges_
+ /// iterator.
+ void CancelInternal(const Status& status);
+
+ /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
+ void SetBlockedOnBuffer();
+
/// Returns true if no more buffers will be returned to clients in the future,
/// either because of hitting eosr or cancellation.
bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const {
@@ -386,6 +422,10 @@ class ScanRange : public RequestRange {
struct hadoopRzBuffer* cached_buffer_ = nullptr;
};
+ /// The number of buffers that have been returned to a client via GetNext() that have
+ /// not yet been returned with ReturnBuffer().
+ AtomicInt32 num_buffers_in_reader_{0};
+
/// Lock protecting fields below.
/// This lock should not be taken during Open()/Read()/Close().
/// If RequestContext::lock_ and this lock need to be held simultaneously,
@@ -395,18 +435,30 @@ class ScanRange : public RequestRange {
/// Number of bytes read so far for this scan range
int bytes_read_;
- /// The number of buffers that have been returned to a client via GetNext() that have
- /// not yet been returned with ReturnBuffer().
- int num_buffers_in_reader_ = 0;
+ /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are
+ /// initially populated when the client calls AllocateBuffersForRange() and
+ /// and are used to read scanned data into. Buffers are taken from this vector for
+ /// every read and added back, if needed, when the client calls ReturnBuffer().
+ std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_;
+
+ /// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
+ int64_t unused_iomgr_buffer_bytes_ = 0;
+
+ /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to
+ /// infer how many bytes of buffers need to be held onto to read the rest of the scan
+ /// range.
+ int64_t iomgr_buffer_bytes_returned_ = 0;
/// If true, the last buffer for this scan range has been queued.
/// If this is true and 'ready_buffers_' is empty, then no more buffers will be
/// returned to the caller by this scan range.
bool eosr_queued_ = false;
- /// If true, this scan range has been removed from the reader's in_flight_ranges
- /// queue because the ready_buffers_ queue is full.
- bool blocked_on_queue_ = false;
+ /// If true, this scan range is not scheduled because a buffer is not available for
+ /// the next I/O in the range. This can happen when the scan range is initially created
+ /// or if the buffers added to the range have all been filled with data an not yet
+ /// returned.
+ bool blocked_on_buffer_ = false;
/// IO buffers that are queued for this scan range. When Cancel() is called
/// this is drained by the cancelling thread. I.e. this is always empty if
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1ffba00..0663a2b 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -42,7 +42,7 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
// any time and only one thread will remove from the queue. This is to guarantee
// that buffers are queued and read in file order.
-bool ScanRange::EnqueueBuffer(
+bool ScanRange::EnqueueReadyBuffer(
const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
@@ -52,19 +52,17 @@ bool ScanRange::EnqueueBuffer(
DCHECK(!eosr_queued_);
if (!cancel_status_.ok()) {
// This range has been cancelled, no need to enqueue the buffer.
- reader_->num_used_buffers_.Add(-1);
- CleanUpBufferLocked(scan_range_lock, move(buffer));
+ CleanUpBuffer(scan_range_lock, move(buffer));
return false;
}
- reader_->num_ready_buffers_.Add(1);
+ // Clean up any surplus buffers. E.g. we may have allocated too many if the file was
+ // shorter than expected.
+ if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock);
eosr_queued_ = buffer->eosr();
ready_buffers_.emplace_back(move(buffer));
-
- DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
- blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
}
buffer_ready_cv_.NotifyOne();
- return blocked_on_queue_;
+ return true;
}
Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
@@ -84,64 +82,97 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
// Remove the first ready buffer from the queue and return it
DCHECK(!ready_buffers_.empty());
- DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
*buffer = move(ready_buffers_.front());
ready_buffers_.pop_front();
eosr = (*buffer)->eosr();
+ DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString();
}
- // Update tracking counters. The buffer has now moved from the IoMgr to the
- // caller.
- reader_->num_ready_buffers_.Add(-1);
- reader_->num_used_buffers_.Add(-1);
- if (eosr) reader_->num_finished_ranges_.Add(1);
-
- unique_lock<mutex> reader_lock(reader_->lock_);
+ // Update tracking counters. The buffer has now moved from the IoMgr to the caller.
+ if (eosr) reader_->RemoveActiveScanRange(this);
+ num_buffers_in_reader_.Add(1);
+ return Status::OK();
+}
- DCHECK(reader_->Validate()) << endl << reader_->DebugString();
- if (reader_->state_ == RequestContext::Cancelled) {
- reader_->blocked_ranges_.Remove(this);
- Cancel(Status::CANCELLED);
- CleanUpBuffer(move(*buffer));
- return Status::CANCELLED;
- }
+void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+ vector<unique_ptr<BufferDescriptor>> buffers;
+ buffers.emplace_back(move(buffer_desc));
+ AddUnusedBuffers(move(buffers), true);
+}
- // At this point success is guaranteed so increment counters for returned buffers.
- reader_->num_buffers_in_reader_.Add(1);
+void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
+ bool returned) {
+ DCHECK_GT(buffers.size(), 0);
+ /// Keep track of whether the range was unblocked in this function. If so, we need
+ /// to schedule it so it resumes progress.
+ bool unblocked = false;
{
- // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
- // may have been called after we released 'lock_' above so we need to re-check
- // whether the queue is full.
unique_lock<mutex> scan_range_lock(lock_);
- ++num_buffers_in_reader_;
- if (blocked_on_queue_
- && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
- && !eosr_queued_) {
- blocked_on_queue_ = false;
- // This scan range was blocked and is no longer, add it to the reader
- // queue again.
- reader_->blocked_ranges_.Remove(this);
+ if (returned) {
+ // Buffers were in reader but now aren't.
+ num_buffers_in_reader_.Add(-buffers.size());
+ }
+
+ for (unique_ptr<BufferDescriptor>& buffer : buffers) {
+ // We should not hold onto the buffers in the following cases:
+ // 1. the scan range is using external buffers, e.g. cached buffers.
+ // 2. the scan range is cancelled
+ // 3. the scan range already hit eosr
+ // 4. we already have enough buffers to read the remainder of the scan range.
+ if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
+ || !cancel_status_.ok()
+ || eosr_queued_
+ || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) {
+ CleanUpBuffer(scan_range_lock, move(buffer));
+ } else {
+ unused_iomgr_buffer_bytes_ += buffer->buffer_len();
+ unused_iomgr_buffers_.emplace_back(move(buffer));
+ if (blocked_on_buffer_) {
+ blocked_on_buffer_ = false;
+ unblocked = true;
+ }
+ }
+ }
+ }
+ // Must drop the ScanRange lock before acquiring the RequestContext lock.
+ if (unblocked) {
+ unique_lock<mutex> reader_lock(reader_->lock_);
+ // Reader may have been cancelled after we dropped 'scan_range_lock' above.
+ if (reader_->state_ == RequestContext::Cancelled) {
+ DCHECK(!cancel_status_.ok());
+ } else {
reader_->ScheduleScanRange(reader_lock, this);
}
}
- return Status::OK();
}
-void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
- reader_->num_buffers_in_reader_.Add(-1);
- {
- unique_lock<mutex> scan_range_lock(lock_);
- --num_buffers_in_reader_;
- CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
+ const unique_lock<mutex>& scan_range_lock) {
+ DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
+ if (unused_iomgr_buffers_.empty()) return nullptr;
+ unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back());
+ unused_iomgr_buffers_.pop_back();
+ unused_iomgr_buffer_bytes_ -= result->buffer_len();
+ return result;
+}
+
+unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
+ unique_lock<mutex> lock(lock_);
+ unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
+ if (buffer_desc == nullptr) {
+ blocked_on_buffer_ = true;
+ } else {
+ iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
}
+ return buffer_desc;
}
-void ScanRange::CleanUpBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
- unique_lock<mutex> scan_range_lock(lock_);
- CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+void ScanRange::SetBlockedOnBuffer() {
+ unique_lock<mutex> lock(lock_);
+ blocked_on_buffer_ = true;
}
-void ScanRange::CleanUpBufferLocked(
+void ScanRange::CleanUpBuffer(
const boost::unique_lock<boost::mutex>& scan_range_lock,
unique_ptr<BufferDescriptor> buffer_desc) {
DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
@@ -149,7 +180,7 @@ void ScanRange::CleanUpBufferLocked(
DCHECK_EQ(this, buffer_desc->scan_range_);
buffer_desc->reader_->FreeBuffer(buffer_desc.get());
- if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_ == 0) {
+ if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) {
// Close the scan range if there are no more buffers in the reader and no more buffers
// will be returned to readers in future. Close() is idempotent so it is ok to call
// multiple times during cleanup so long as the range is actually finished.
@@ -157,10 +188,33 @@ void ScanRange::CleanUpBufferLocked(
}
}
+void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) {
+ unique_lock<mutex> lock(lock_);
+ for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer));
+}
+
+void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) {
+ while (!unused_iomgr_buffers_.empty()) {
+ CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock));
+ }
+}
+
void ScanRange::Cancel(const Status& status) {
// Cancelling a range that was never started, ignore.
if (io_mgr_ == nullptr) return;
+ CancelInternal(status);
+ reader_->RemoveActiveScanRange(this);
+}
+
+void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
+ const Status& status) {
+ DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+ CancelInternal(status);
+ reader_->RemoveActiveScanRangeLocked(reader_lock, this);
+}
+void ScanRange::CancelInternal(const Status& status) {
+ DCHECK(io_mgr_ != nullptr);
DCHECK(!status.ok());
{
// Grab both locks to make sure that we don't change 'cancel_status_' while other
@@ -177,12 +231,13 @@ void ScanRange::Cancel(const Status& status) {
/// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
/// from seeing inconsistent state.
- reader_->num_used_buffers_.Add(-ready_buffers_.size());
- reader_->num_ready_buffers_.Add(-ready_buffers_.size());
while (!ready_buffers_.empty()) {
- CleanUpBufferLocked(scan_range_lock, move(ready_buffers_.front()));
+ CleanUpBuffer(scan_range_lock, move(ready_buffers_.front()));
ready_buffers_.pop_front();
}
+
+ /// Clean up buffers that we don't need any more because we won't read any more data.
+ CleanUpUnusedBuffers(scan_range_lock);
}
buffer_ready_cv_.NotifyAll();
@@ -197,6 +252,10 @@ string ScanRange::DebugString() const {
<< " len=" << len_ << " bytes_read=" << bytes_read_
<< " cancel_status=" << cancel_status_.GetDetail()
<< " buffer_queue=" << ready_buffers_.size()
+ << " num_buffers_in_readers=" << num_buffers_in_reader_.Load()
+ << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size()
+ << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_
+ << " blocked_on_buffer=" << blocked_on_buffer_
<< " hdfs_file=" << exclusive_hdfs_fh_;
return ss.str();
}
@@ -211,6 +270,27 @@ bool ScanRange::Validate() {
LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString();
return false;
}
+ int64_t unused_iomgr_buffer_bytes = 0;
+ for (auto& buffer : unused_iomgr_buffers_)
+ unused_iomgr_buffer_bytes += buffer->buffer_len();
+ if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) {
+ LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: "
+ << unused_iomgr_buffer_bytes_
+ << " vs. expected: " << unused_iomgr_buffer_bytes;
+ return false;
+ }
+ bool is_finished = !cancel_status_.ok() || eosr_queued_;
+ if (is_finished && !unused_iomgr_buffers_.empty()) {
+ LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size()
+ << " bytes: " << unused_iomgr_buffer_bytes_
+ << " cancel_status: " << cancel_status_.GetDetail()
+ << " eosr_queued: " << eosr_queued_;
+ return false;
+ }
+ if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) {
+ LOG(ERROR) << "Blocked despite having buffers: " << DebugString();
+ return false;
+ }
return true;
}
@@ -224,7 +304,7 @@ ScanRange::~ScanRange() {
DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
<< "Cached buffer was not released.";
DCHECK_EQ(0, ready_buffers_.size());
- DCHECK_EQ(0, num_buffers_in_reader_);
+ DCHECK_EQ(0, num_buffers_in_reader_.Load());
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
@@ -268,9 +348,11 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
local_file_ = nullptr;
exclusive_hdfs_fh_ = nullptr;
bytes_read_ = 0;
+ unused_iomgr_buffer_bytes_ = 0;
+ iomgr_buffer_bytes_returned_ = 0;
cancel_status_ = Status::OK();
- eosr_queued_= false;
- blocked_on_queue_ = false;
+ eosr_queued_ = false;
+ blocked_on_buffer_ = false;
DCHECK(Validate()) << DebugString();
}
@@ -316,9 +398,7 @@ Status ScanRange::Open(bool use_file_handle_cache) {
"for file: $1: $2", offset_, file_, GetStrErrMsg()));
}
}
- if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
- }
+ ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
return Status::OK();
}
@@ -370,9 +450,7 @@ void ScanRange::Close() {
local_file_ = nullptr;
closed_file = true;
}
- if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
- }
+ if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
}
int64_t ScanRange::MaxReadChunkSize() const {
@@ -580,12 +658,9 @@ Status ScanRange::ReadFromCache(
desc->scan_range_offset_ = 0;
desc->eosr_ = true;
bytes_read_ = bytes_read;
- EnqueueBuffer(reader_lock, move(desc));
- if (reader_->bytes_read_counter_ != nullptr) {
- COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
- }
+ EnqueueReadyBuffer(reader_lock, move(desc));
+ COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
*read_succeeded = true;
- reader_->num_used_buffers_.Add(1);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 7b00179..3a69c33 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -396,7 +396,10 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
BufferOpts::ReadInto(buffer.data(), buffer.len()));
read_counter_->Add(1);
bytes_read_counter_->Add(buffer.len());
- RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true));
+ bool needs_buffers;
+ RETURN_IF_ERROR(io_mgr_->StartScanRange(
+ io_ctx_.get(), handle->read_range_, &needs_buffers));
+ DCHECK(!needs_buffers) << "Already provided a buffer";
return Status::OK();
}
@@ -522,11 +525,20 @@ TmpFileMgr::WriteHandle::WriteHandle(
is_cancelled_(false),
write_in_flight_(false) {}
+TmpFileMgr::WriteHandle::~WriteHandle() {
+ DCHECK(!write_in_flight_);
+ DCHECK(read_range_ == nullptr);
+}
+
string TmpFileMgr::WriteHandle::TmpFilePath() const {
if (file_ == nullptr) return "";
return file_->path();
}
+int64_t TmpFileMgr::WriteHandle::len() const {
+ return write_range_->len();
+}
+
Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx,
File* file, int64_t offset, MemRange buffer,
WriteRange::WriteDoneCallback callback) {
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 95072ae..55901f4 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,7 +28,6 @@
#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
-#include "runtime/io/request-ranges.h"
#include "util/collection-metrics.h"
#include "util/condition-variable.h"
#include "util/mem-range.h"
@@ -37,6 +36,12 @@
#include "util/spinlock.h"
namespace impala {
+namespace io {
+ class DiskIoMgr;
+ class RequestContext;
+ class ScanRange;
+ class WriteRange;
+}
/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
@@ -84,6 +89,7 @@ class TmpFileMgr {
/// Needs to be public for TmpFileMgrTest.
typedef int DeviceId;
+ /// Same typedef as io::WriteRange::WriteDoneCallback.
typedef std::function<void(const Status&)> WriteDoneCallback;
/// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -277,10 +283,7 @@ class TmpFileMgr {
public:
/// The write must be destroyed by passing it to FileGroup - destroying it before
/// the write completes is an error.
- ~WriteHandle() {
- DCHECK(!write_in_flight_);
- DCHECK(read_range_ == nullptr);
- }
+ ~WriteHandle();
/// Cancel any in-flight read synchronously.
void CancelRead();
@@ -290,7 +293,7 @@ class TmpFileMgr {
std::string TmpFilePath() const;
/// The length of the write range in bytes.
- int64_t len() const { return write_range_->len(); }
+ int64_t len() const;
std::string DebugString();
@@ -305,7 +308,7 @@ class TmpFileMgr {
/// failure and 'is_cancelled_' is set to true on failure.
Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
int64_t offset, MemRange buffer,
- io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+ WriteDoneCallback callback) WARN_UNUSED_RESULT;
/// Retry the write after the initial write failed with an error, instead writing to
/// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 5f8d443..6f70727 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -258,6 +258,17 @@ TEST(BitUtil, Log2) {
EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64);
}
+TEST(BitUtil, RoundToPowerOfTwo) {
+ EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9));
+ EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15));
+ EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16));
+ EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17));
+ EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9));
+ EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15));
+ EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16));
+ EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17));
+}
+
TEST(BitUtil, RoundUpToPowerOf2) {
EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8);
EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8);
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 5c9a29b..8a65509 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -98,6 +98,12 @@ class BitUtil {
return v;
}
+ /// Returns the largest power of two <= v.
+ static inline int64_t RoundDownToPowerOfTwo(int64_t v) {
+ int64_t v_rounded_up = RoundUpToPowerOfTwo(v);
+ return v_rounded_up == v ? v : v_rounded_up / 2;
+ }
+
/// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is
/// a power of two
static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) {
@@ -105,7 +111,7 @@ class BitUtil {
return (value + (factor - 1)) & ~(factor - 1);
}
- static inline int RoundDownToPowerOf2(int value, int factor) {
+ static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) {
DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
return value & ~(factor - 1);
}