You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2017/09/15 04:33:39 UTC
[2/4] incubator-impala git commit: IMPALA-5417: make I/O buffer queue
fixed-size
IMPALA-5417: make I/O buffer queue fixed-size
This removes the dynamically-varying queue size behaviour in the I/O
manager. The motivation is to bound resource consumption of scans
and make it possible to reserve memory for I/O buffers upfront.
Does some cleanup/documentation of the locking policy. Fix some cases
in ScanRange::GetNext() where members documented as being protected by
ScanRange::lock_ were accessed without holding it. I think the races
were either benign or prevented by holding DiskIoRequestContext::lock_
in practice.
Testing:
Ran exhaustive build.
Perf:
Ran the full set of workloads (TPC-H, TPC-DS, targeted) on a 16 node
cluster. Everything was within normal variance.
Change-Id: If7cc3f7199f5320db00b7face97a96cdadb6f83f
Reviewed-on: http://gerrit.cloudera.org:8080/7408
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50d603d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50d603d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50d603d3
Branch: refs/heads/master
Commit: 50d603d306c58ef42605eeb715356e1003dedeb4
Parents: 3e73ce2
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 14 08:43:58 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Sep 15 01:59:29 2017 +0000
----------------------------------------------------------------------
be/src/runtime/disk-io-mgr-internal.h | 23 +-
be/src/runtime/disk-io-mgr-reader-context.cc | 2 -
be/src/runtime/disk-io-mgr-scan-range.cc | 70 ++--
be/src/runtime/disk-io-mgr-test.cc | 445 ++++++++++------------
be/src/runtime/disk-io-mgr.cc | 10 +-
be/src/runtime/disk-io-mgr.h | 61 +--
6 files changed, 279 insertions(+), 332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index 30bcd60..a9acca4 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -180,17 +180,13 @@ class DiskIoRequestContext {
state.ScheduleContext(this, range->disk_id());
}
- /// Cancels the context with status code 'status'.
+ /// Cancels the context with status code 'status'
void Cancel(const Status& status);
/// Adds request range to disk queue for this request context. Currently,
/// schedule_immediately must be false is RequestRange is a write range.
void AddRequestRange(RequestRange* range, bool schedule_immediately);
- /// Returns the default queue capacity for scan ranges. This is updated
- /// as the reader processes ranges.
- int initial_scan_range_queue_capacity() const { return initial_queue_capacity_; }
-
/// Validates invariants of reader. Reader lock must be taken beforehand.
bool Validate() const;
@@ -265,22 +261,9 @@ class DiskIoRequestContext {
/// This is the sum of all queued buffers in all ranges for this reader context.
AtomicInt32 num_ready_buffers_;
- /// The total (sum) of queue capacities for finished scan ranges. This value
- /// divided by num_finished_ranges_ is the average for finished ranges and
- /// used to seed the starting queue capacity for future ranges. The assumption
- /// is that if previous ranges were fast, new ones will be fast too. The scan
- /// range adjusts the queue capacity dynamically so a rough approximation will do.
- AtomicInt32 total_range_queue_capacity_;
-
- /// The initial queue size for new scan ranges. This is always
- /// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate
- /// variable to allow reading this value without taking a lock. Doing the division
- /// at read time (with no lock) could lead to a race where only
- /// total_range_queue_capacity_ or num_finished_ranges_ was updated.
- int initial_queue_capacity_;
-
/// All fields below are accessed by multiple threads and the lock needs to be
- /// taken before accessing them.
+ /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
+ /// are held simultaneously.
boost::mutex lock_;
/// Current state of the reader
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index 6f3fe77..77f332a 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -157,7 +157,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) {
num_used_buffers_.Store(0);
num_buffers_in_reader_.Store(0);
num_ready_buffers_.Store(0);
- total_range_queue_capacity_.Store(0);
num_finished_ranges_.Store(0);
num_remote_ranges_.Store(0);
bytes_read_local_.Store(0);
@@ -166,7 +165,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) {
unexpected_remote_bytes_.Store(0);
cached_file_handles_hit_count_.Store(0);
cached_file_handles_miss_count_.Store(0);
- initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
DCHECK(ready_to_start_ranges_.empty());
DCHECK(blocked_ranges_.empty());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 8ed5138..c5c9514 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -24,11 +24,6 @@
using namespace impala;
-// A very large max value to prevent things from going out of control. Not
-// expected to ever hit this value (1GB of buffered data per range).
-const int MAX_QUEUE_CAPACITY = 128;
-const int MIN_QUEUE_CAPACITY = 2;
-
DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
"when performing HDFS read operations. This is necessary to use HDFS hedged reads "
"(assuming the HDFS client is configured to do so).");
@@ -46,8 +41,9 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
// any time and only one thread will remove from the queue. This is to guarantee
// that buffers are queued and read in file order.
-// This must be called with the reader lock taken.
-bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
+bool DiskIoMgr::ScanRange::EnqueueBuffer(
+ const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
+ DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
@@ -67,13 +63,8 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
eosr_queued_ = buffer->eosr();
ready_buffers_.emplace_back(move(buffer));
- blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
- if (blocked_on_queue_ && ready_buffers_capacity_ > MIN_QUEUE_CAPACITY) {
- // We have filled the queue, indicating we need back pressure on
- // the producer side (i.e. we are pushing buffers faster than they
- // are pulled off, throttle this range more).
- --ready_buffers_capacity_;
- }
+ DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
+ blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
}
buffer_ready_cv_.notify_one();
@@ -83,18 +74,12 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
DCHECK(*buffer == nullptr);
+ bool eosr;
{
unique_lock<mutex> scan_range_lock(lock_);
if (eosr_returned_) return Status::OK();
DCHECK(Validate()) << DebugString();
- if (ready_buffers_.empty()) {
- // The queue is empty indicating this thread could use more
- // IO. Increase the capacity to allow for more queueing.
- ++ready_buffers_capacity_ ;
- ready_buffers_capacity_ = ::min(ready_buffers_capacity_, MAX_QUEUE_CAPACITY);
- }
-
while (ready_buffers_.empty() && !is_cancelled_) {
buffer_ready_cv_.wait(scan_range_lock);
}
@@ -106,9 +91,11 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
// Remove the first ready buffer from the queue and return it
DCHECK(!ready_buffers_.empty());
+ DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
*buffer = move(ready_buffers_.front());
ready_buffers_.pop_front();
eosr_returned_ = (*buffer)->eosr();
+ eosr = (*buffer)->eosr();
}
// Update tracking counters. The buffer has now moved from the IoMgr to the
@@ -117,6 +104,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
reader_->num_buffers_in_reader_.Add(1);
reader_->num_ready_buffers_.Add(-1);
reader_->num_used_buffers_.Add(-1);
+ if (eosr) reader_->num_finished_ranges_.Add(1);
Status status = (*buffer)->status_;
if (!status.ok()) {
@@ -125,12 +113,6 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
}
unique_lock<mutex> reader_lock(reader_->lock_);
- if (eosr_returned_) {
- reader_->total_range_queue_capacity_.Add(ready_buffers_capacity_);
- reader_->num_finished_ranges_.Add(1);
- reader_->initial_queue_capacity_ = reader_->total_range_queue_capacity_.Load() /
- reader_->num_finished_ranges_.Load();
- }
DCHECK(reader_->Validate()) << endl << reader_->DebugString();
if (reader_->state_ == DiskIoRequestContext::Cancelled) {
@@ -140,13 +122,19 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
return status_;
}
- bool was_blocked = blocked_on_queue_;
- blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
- if (was_blocked && !blocked_on_queue_ && !eosr_queued_) {
- // This scan range was blocked and is no longer, add it to the reader
- // queue again.
- reader_->blocked_ranges_.Remove(this);
- reader_->ScheduleScanRange(this);
+ {
+ // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
+ // may have been called after we released 'lock_' above so we need to re-check
+ // whether the queue is full.
+ unique_lock<mutex> scan_range_lock(lock_);
+ if (blocked_on_queue_ && ready_buffers_.size() < SCAN_RANGE_READY_BUFFER_LIMIT
+ && !eosr_queued_) {
+ blocked_on_queue_ = false;
+ // This scan range was blocked and is no longer, add it to the reader
+ // queue again.
+ reader_->blocked_ranges_.Remove(this);
+ reader_->ScheduleScanRange(this);
+ }
}
return Status::OK();
}
@@ -191,7 +179,6 @@ string DiskIoMgr::ScanRange::DebugString() const {
ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
<< " len=" << len_ << " bytes_read=" << bytes_read_
<< " buffer_queue=" << ready_buffers_.size()
- << " capacity=" << ready_buffers_capacity_
<< " hdfs_file=" << exclusive_hdfs_fh_;
return ss.str();
}
@@ -211,11 +198,10 @@ bool DiskIoMgr::ScanRange::Validate() {
return true;
}
-DiskIoMgr::ScanRange::ScanRange(int capacity)
+DiskIoMgr::ScanRange::ScanRange()
: RequestRange(RequestType::READ),
num_remote_bytes_(0),
external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
- ready_buffers_capacity_(capacity),
mtime_(-1) {}
DiskIoMgr::ScanRange::~ScanRange() {
@@ -269,10 +255,6 @@ void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext*
eosr_queued_= false;
eosr_returned_= false;
blocked_on_queue_ = false;
- if (ready_buffers_capacity_ <= 0) {
- ready_buffers_capacity_ = reader->initial_scan_range_queue_capacity();
- DCHECK_GE(ready_buffers_capacity_, MIN_QUEUE_CAPACITY);
- }
DCHECK(Validate()) << DebugString();
}
@@ -525,7 +507,9 @@ Status DiskIoMgr::ScanRange::Read(
return Status::OK();
}
-Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
+Status DiskIoMgr::ScanRange::ReadFromCache(
+ const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
+ DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
DCHECK(try_cache_);
DCHECK_EQ(bytes_read_, 0);
*read_succeeded = false;
@@ -580,7 +564,7 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
desc->scan_range_offset_ = 0;
desc->eosr_ = true;
bytes_read_ = bytes_read;
- EnqueueBuffer(move(desc));
+ EnqueueBuffer(reader_lock, move(desc));
if (reader_->bytes_read_counter_ != nullptr) {
COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 7c60efa..a6a719f 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -48,12 +48,10 @@ namespace impala {
class DiskIoMgrTest : public testing::Test {
public:
- virtual void SetUp() {
- pool_.reset(new ObjectPool);
- }
+ virtual void SetUp() {}
virtual void TearDown() {
- pool_.reset();
+ pool_.Clear();
}
void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
@@ -64,7 +62,7 @@ class DiskIoMgrTest : public testing::Test {
EXPECT_EQ(status.code(), expected_status.code());
}
if (status.ok()) {
- DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
+ DiskIoMgr::ScanRange* scan_range = pool_.Add(new DiskIoMgr::ScanRange());
scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(),
(*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached());
ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
@@ -165,21 +163,20 @@ class DiskIoMgrTest : public testing::Test {
}
}
- DiskIoMgr::ScanRange* AllocateRange(int num_buffers) {
- return pool_->Add(new DiskIoMgr::ScanRange(num_buffers));
+ DiskIoMgr::ScanRange* AllocateRange() {
+ return pool_.Add(new DiskIoMgr::ScanRange);
}
- DiskIoMgr::ScanRange* InitRange(int num_buffers, const char* file_path, int offset,
- int len, int disk_id, int64_t mtime, void* meta_data = NULL,
- bool is_cached = false) {
- DiskIoMgr::ScanRange* range = AllocateRange(num_buffers);
+ DiskIoMgr::ScanRange* InitRange(const char* file_path, int offset, int len,
+ int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) {
+ DiskIoMgr::ScanRange* range = AllocateRange();
range->Reset(NULL, file_path, len, offset, disk_id, true,
DiskIoMgr::BufferOpts(is_cached, mtime), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
- scoped_ptr<ObjectPool> pool_;
+ ObjectPool pool_;
mutex written_mutex_;
condition_variable writes_done_;
@@ -211,19 +208,19 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.reset(new ObjectPool);
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init(&mem_tracker));
DiskIoRequestContext* writer;
io_mgr.RegisterContext(&writer, &mem_tracker);
for (int i = 0; i < num_ranges; ++i) {
- int32_t* data = pool_->Add(new int32_t);
+ int32_t* data = pool_.Add(new int32_t);
*data = rand();
- DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+ DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
new_range, read_io_mgr.get(), reader, data, Status::OK(), _1);
- *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
+ *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
@@ -253,16 +250,16 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
DiskIoRequestContext* writer;
io_mgr.RegisterContext(&writer, NULL);
- int32_t* data = pool_->Add(new int32_t);
+ int32_t* data = pool_.Add(new int32_t);
*data = rand();
// Write to file in non-existent directory.
- DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+ DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
(DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
- *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
+ *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
@@ -275,12 +272,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
EXPECT_TRUE(false);
}
- new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+ new_range = pool_.Add(new DiskIoMgr::WriteRange*);
callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
- *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
+ *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
@@ -317,7 +314,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.reset(new ObjectPool);
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init(&mem_tracker));
DiskIoRequestContext* writer;
@@ -328,14 +325,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
io_mgr.CancelContext(writer);
validate_status = Status::CANCELLED;
}
- int32_t* data = pool_->Add(new int32_t);
+ int32_t* data = pool_.Add(new int32_t);
*data = rand();
- DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+ DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
Status::CANCELLED, _1);
- *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
+ *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
cur_offset += sizeof(int32_t);
@@ -372,41 +369,39 @@ TEST_F(DiskIoMgrTest, SingleReader) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
- for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
- pool_.reset(new ObjectPool);
- LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
- << " num_disk=" << num_disks << " num_buffers=" << num_buffers
- << " num_read_threads=" << num_read_threads;
-
- if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
- DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+ for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
+ ObjectPool pool;
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks
+ << " num_read_threads=" << num_read_threads;
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- DiskIoRequestContext* reader;
- io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- vector<DiskIoMgr::ScanRange*> ranges;
- for (int i = 0; i < len; ++i) {
- int disk_id = i % num_disks;
- ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
- stat_val.st_mtime));
- }
- ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ DiskIoRequestContext* reader;
+ io_mgr.RegisterContext(&reader, &reader_mem_tracker);
- AtomicInt32 num_ranges_processed;
- thread_group threads;
- for (int i = 0; i < num_read_threads; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
- len, Status::OK(), 0, &num_ranges_processed));
- }
- threads.join_all();
+ vector<DiskIoMgr::ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(
+ InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
- io_mgr.UnregisterContext(reader);
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < num_read_threads; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+ len, Status::OK(), 0, &num_ranges_processed));
}
+ threads.join_all();
+
+ EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+ io_mgr.UnregisterContext(reader);
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
}
}
}
@@ -428,56 +423,53 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
- pool_.reset(new ObjectPool);
- LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
- << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
-
- if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
- DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- DiskIoRequestContext* reader;
- io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- vector<DiskIoMgr::ScanRange*> ranges_first_half;
- vector<DiskIoMgr::ScanRange*> ranges_second_half;
- for (int i = 0; i < len; ++i) {
- int disk_id = i % num_disks;
- if (i > len / 2) {
- ranges_second_half.push_back(
- InitRange(num_buffers, tmp_file, i, 1, disk_id,
- stat_val.st_mtime));
- } else {
- ranges_first_half.push_back(InitRange(num_buffers, tmp_file, i, 1, disk_id,
- stat_val.st_mtime));
- }
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ DiskIoRequestContext* reader;
+ io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+
+ vector<DiskIoMgr::ScanRange*> ranges_first_half;
+ vector<DiskIoMgr::ScanRange*> ranges_second_half;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ if (i > len / 2) {
+ ranges_second_half.push_back(
+ InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ } else {
+ ranges_first_half.push_back(
+ InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
}
- AtomicInt32 num_ranges_processed;
-
- // Issue first half the scan ranges.
- ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
+ }
+ AtomicInt32 num_ranges_processed;
- // Read a couple of them
- ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
- &num_ranges_processed);
+ // Issue first half the scan ranges.
+ ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
- // Issue second half
- ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half));
+ // Read a couple of them
+ ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
+ &num_ranges_processed);
- // Start up some threads and then cancel
- thread_group threads;
- for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
- }
+ // Issue second half
+ ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half));
- threads.join_all();
- EXPECT_EQ(num_ranges_processed.Load(), len);
- io_mgr.UnregisterContext(reader);
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ // Start up some threads and then cancel
+ thread_group threads;
+ for (int i = 0; i < 3; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
}
+
+ threads.join_all();
+ EXPECT_EQ(num_ranges_processed.Load(), len);
+ io_mgr.UnregisterContext(reader);
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
}
}
EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -500,57 +492,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
- pool_.reset(new ObjectPool);
- LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
- << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
- if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
- DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
- MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+ MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- DiskIoRequestContext* reader;
- io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ DiskIoRequestContext* reader;
+ io_mgr.RegisterContext(&reader, &reader_mem_tracker);
- DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
- stat_val.st_mtime);
+ DiskIoMgr::ScanRange* complete_range = InitRange(tmp_file, 0, strlen(data), 0,
+ stat_val.st_mtime);
- // Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader, complete_range, data);
- ValidateSyncRead(&io_mgr, reader, complete_range, data);
+ // Issue some reads before the async ones are issued
+ ValidateSyncRead(&io_mgr, reader, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader, complete_range, data);
- vector<DiskIoMgr::ScanRange*> ranges;
- for (int i = 0; i < len; ++i) {
- int disk_id = i % num_disks;
- ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
- stat_val.st_mtime));
- }
- ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+ vector<DiskIoMgr::ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(InitRange(tmp_file, 0, len, disk_id,
+ stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt32 num_ranges_processed;
- thread_group threads;
- for (int i = 0; i < 5; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
- strlen(data), Status::OK(), 0, &num_ranges_processed));
- }
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < 5; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+ strlen(data), Status::OK(), 0, &num_ranges_processed));
+ }
- // Issue some more sync ranges
- for (int i = 0; i < 5; ++i) {
- sched_yield();
- ValidateSyncRead(&io_mgr, reader, complete_range, data);
- }
+ // Issue some more sync ranges
+ for (int i = 0; i < 5; ++i) {
+ sched_yield();
+ ValidateSyncRead(&io_mgr, reader, complete_range, data);
+ }
- threads.join_all();
+ threads.join_all();
- ValidateSyncRead(&io_mgr, reader, complete_range, data);
- ValidateSyncRead(&io_mgr, reader, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader, complete_range, data);
- EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
- io_mgr.UnregisterContext(reader);
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
- }
+ EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+ io_mgr.UnregisterContext(reader);
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
}
}
EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -571,51 +561,48 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
- pool_.reset(new ObjectPool);
- LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
- << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
-
- if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
- DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- DiskIoRequestContext* reader;
- io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- vector<DiskIoMgr::ScanRange*> ranges;
- for (int i = 0; i < len; ++i) {
- int disk_id = i % num_disks;
- ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
- stat_val.st_mtime));
- }
- ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ DiskIoRequestContext* reader;
+ io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+
+ vector<DiskIoMgr::ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt32 num_ranges_processed;
- int num_succesful_ranges = ranges.size() / 2;
- // Read half the ranges
- for (int i = 0; i < num_succesful_ranges; ++i) {
- ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
- &num_ranges_processed);
- }
- EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
+ AtomicInt32 num_ranges_processed;
+ int num_succesful_ranges = ranges.size() / 2;
+ // Read half the ranges
+ for (int i = 0; i < num_succesful_ranges; ++i) {
+ ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
+ &num_ranges_processed);
+ }
+ EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
- // Start up some threads and then cancel
- thread_group threads;
- for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
- }
+ // Start up some threads and then cancel
+ thread_group threads;
+ for (int i = 0; i < 3; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ }
- io_mgr.CancelContext(reader);
- sched_yield();
+ io_mgr.CancelContext(reader);
+ sched_yield();
- threads.join_all();
- EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
- io_mgr.UnregisterContext(reader);
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
- }
+ threads.join_all();
+ EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
+ io_mgr.UnregisterContext(reader);
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
}
}
EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -632,15 +619,10 @@ TEST_F(DiskIoMgrTest, MemLimits) {
struct stat stat_val;
stat(tmp_file, &stat_val);
- const int num_buffers = 25;
- // Give the reader more buffers than the limit
const int mem_limit_num_buffers = 2;
-
- int64_t iters = 0;
+ // Allocate enough ranges so that the total buffers exceeds the mem limit.
+ const int num_ranges = 25;
{
- pool_.reset(new ObjectPool);
- if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-
MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
@@ -650,9 +632,8 @@ TEST_F(DiskIoMgrTest, MemLimits) {
io_mgr.RegisterContext(&reader, &reader_mem_tracker);
vector<DiskIoMgr::ScanRange*> ranges;
- for (int i = 0; i < num_buffers; ++i) {
- ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, 0,
- stat_val.st_mtime));
+ for (int i = 0; i < num_ranges; ++i) {
+ ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
@@ -711,12 +692,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
stat(tmp_file, &stat_val);
const int num_disks = 2;
- const int num_buffers = 3;
-
- int64_t iters = 0;
{
- pool_.reset(new ObjectPool);
- if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init(&mem_tracker));
@@ -725,7 +701,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
io_mgr.RegisterContext(&reader, &reader_mem_tracker);
DiskIoMgr::ScanRange* complete_range =
- InitRange(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
+ InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
// Issue some reads before the async ones are issued
ValidateSyncRead(&io_mgr, reader, complete_range, data);
@@ -734,8 +710,8 @@ TEST_F(DiskIoMgrTest, CachedReads) {
vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
- stat_val.st_mtime, NULL, true));
+ ranges.push_back(
+ InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, NULL, true));
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
@@ -797,7 +773,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
for (int file_index = 0; file_index < num_contexts; ++file_index) {
io_mgr.RegisterContext(&contexts[file_index], &mem_tracker);
}
- pool_.reset(new ObjectPool);
+ pool_.Clear();
int read_offset = 0;
int write_offset = 0;
while (read_offset < file_size) {
@@ -808,7 +784,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
vector<DiskIoMgr::ScanRange*> ranges;
int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
for (int i = 0; i < num_scan_ranges; ++i) {
- ranges.push_back(InitRange(1, file_name.c_str(), read_offset, 1,
+ ranges.push_back(InitRange(file_name.c_str(), read_offset, 1,
i % num_disks, stat_val.st_mtime));
threads.add_thread(new thread(ScanRangeThread, &io_mgr,
contexts[context_index],
@@ -823,7 +799,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
this, num_write_ranges, _1);
- DiskIoMgr::WriteRange* new_range = pool_->Add(
+ DiskIoMgr::WriteRange* new_range = pool_.Add(
new DiskIoMgr::WriteRange(file_name,
write_offset, i % num_disks, callback));
new_range->SetData(reinterpret_cast<const uint8_t*>
@@ -899,44 +875,41 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
- pool_.reset(new ObjectPool);
- LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
- << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
- if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
+ << " num_disk=" << num_disks;
+ if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
- DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
- MAX_BUFFER_SIZE);
- EXPECT_OK(io_mgr.Init(&mem_tracker));
+ DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+ MAX_BUFFER_SIZE);
+ EXPECT_OK(io_mgr.Init(&mem_tracker));
- for (int i = 0; i < NUM_READERS; ++i) {
- io_mgr.RegisterContext(&readers[i], &mem_tracker);
+ for (int i = 0; i < NUM_READERS; ++i) {
+ io_mgr.RegisterContext(&readers[i], &mem_tracker);
- vector<DiskIoMgr::ScanRange*> ranges;
- for (int j = 0; j < DATA_LEN; ++j) {
- int disk_id = j % num_disks;
- ranges.push_back(
- InitRange(num_buffers,file_names[i].c_str(), j, 1, disk_id,
- mtimes[i]));
- }
- ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
+ vector<DiskIoMgr::ScanRange*> ranges;
+ for (int j = 0; j < DATA_LEN; ++j) {
+ int disk_id = j % num_disks;
+ ranges.push_back(
+ InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
}
+ ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
+ }
- AtomicInt32 num_ranges_processed;
- thread_group threads;
- for (int i = 0; i < NUM_READERS; ++i) {
- for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
- data[i].c_str(), data[i].size(), Status::OK(), 0,
- &num_ranges_processed));
- }
- }
- threads.join_all();
- EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
- for (int i = 0; i < NUM_READERS; ++i) {
- io_mgr.UnregisterContext(readers[i]);
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < NUM_READERS; ++i) {
+ for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
+ data[i].c_str(), data[i].size(), Status::OK(), 0,
+ &num_ranges_processed));
}
}
+ threads.join_all();
+ EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
+ for (int i = 0; i < NUM_READERS; ++i) {
+ io_mgr.UnregisterContext(readers[i]);
+ }
}
}
}
@@ -966,7 +939,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
DiskIoRequestContext* reader;
io_mgr.RegisterContext(&reader, &reader_mem_tracker);
- DiskIoMgr::ScanRange* dummy_range = InitRange(1, "dummy", 0, 0, 0, 0);
+ DiskIoMgr::ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
// buffer length should be rounded up to min buffer size
int64_t buffer_len = 1;
@@ -1043,7 +1016,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
io_mgr->RegisterContext(&reader, &reader_mem_tracker);
// We should not read past the end of file.
- DiskIoMgr::ScanRange* range = InitRange(1, tmp_file, 0, read_len, 0, stat_val.st_mtime);
+ DiskIoMgr::ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
ASSERT_OK(io_mgr->Read(reader, range, &buffer));
ASSERT_TRUE(buffer->eosr());
@@ -1052,7 +1025,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
io_mgr->ReturnBuffer(move(buffer));
io_mgr->UnregisterContext(reader);
- pool_.reset();
+ pool_.Clear();
io_mgr.reset();
EXPECT_EQ(reader_mem_tracker.consumption(), 0);
EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -1078,7 +1051,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
- DiskIoMgr::ScanRange* range = AllocateRange(1);
+ DiskIoMgr::ScanRange* range = AllocateRange();
range->Reset(NULL, tmp_file, scan_len, 0, 0, true,
DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), buffer_len));
ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
@@ -1096,7 +1069,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
}
io_mgr->UnregisterContext(reader);
- pool_.reset();
+ pool_.Clear();
io_mgr.reset();
EXPECT_EQ(mem_tracker.consumption(), 0);
}
@@ -1116,7 +1089,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
vector<uint8_t> client_buffer(SCAN_LEN);
for (int i = 0; i < 1000; ++i) {
io_mgr->RegisterContext(&reader, reader_mem_tracker);
- DiskIoMgr::ScanRange* range = AllocateRange(1);
+ DiskIoMgr::ScanRange* range = AllocateRange();
range->Reset(NULL, tmp_file, SCAN_LEN, 0, 0, true,
DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
@@ -1134,7 +1107,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
io_mgr->UnregisterContext(reader);
}
- pool_.reset();
+ pool_.Clear();
io_mgr.reset();
EXPECT_EQ(mem_tracker.consumption(), 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index e77d9ca..7cc2af7 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -118,7 +118,7 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds,
// current queue size.
static const int LOW_MEMORY = 64 * 1024 * 1024;
-const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2;
+const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
AtomicInt32 DiskIoMgr::next_disk_id_;
@@ -583,7 +583,7 @@ Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
if (range->try_cache_) {
if (schedule_immediately) {
bool cached_read_succeeded;
- RETURN_IF_ERROR(range->ReadFromCache(&cached_read_succeeded));
+ RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
if (cached_read_succeeded) continue;
// Cached read failed, fall back to AddRequestRange() below.
} else {
@@ -633,7 +633,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
*range = reader->cached_ranges_.Dequeue();
DCHECK((*range)->try_cache_);
bool cached_read_succeeded;
- RETURN_IF_ERROR((*range)->ReadFromCache(&cached_read_succeeded));
+ RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, &cached_read_succeeded));
if (cached_read_succeeded) return Status::OK();
// This range ended up not being cached. Loop again and pick up a new range.
@@ -994,7 +994,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
ScanRange* scan_range = buffer->scan_range_;
scan_range->Cancel(reader->status_);
// Enqueue the buffer to use the scan range's buffer cleanup path.
- scan_range->EnqueueBuffer(move(buffer));
+ scan_range->EnqueueBuffer(reader_lock, move(buffer));
return;
}
@@ -1021,7 +1021,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
bool eosr = buffer->eosr_;
ScanRange* scan_range = buffer->scan_range_;
bool is_cached = buffer->is_cached();
- bool queue_full = scan_range->EnqueueBuffer(move(buffer));
+ bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
if (eosr) {
// For cached buffers, we can't close the range until the cached buffer is returned.
// Close() is called from DiskIoMgr::ReturnBuffer().
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 138f973..ed33942 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -129,16 +129,12 @@ class MemTracker;
/// To have multiple reading threads, the caller would simply spin up the threads
/// and each would process the loops above.
//
-/// To control the number of IO buffers, each scan range has a soft max capacity for
-/// the number of queued buffers. If the number of buffers is at capacity, the IoMgr
-/// will no longer read for that scan range until the caller has processed a buffer.
-/// This capacity does not need to be fixed, and the caller can dynamically adjust
-/// it if necessary.
-//
-/// As an example: If we allowed 5 buffers per range on a 24 core, 72 thread
-/// (we default to allowing 3x threads) machine, we should see at most
-/// 72 * 5 * 8MB = 2.8GB in io buffers memory usage. This should remain roughly constant
-/// regardless of how many concurrent readers are running.
+/// To control the number of IO buffers, each scan range has a limit of two queued
+/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity,
+/// the IoMgr will no longer read for that scan range until the caller has processed
+/// a buffer. Assuming the client returns each buffer before requesting the next one
+/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per
+/// scan range.
//
/// Buffer Management:
/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller,
@@ -382,8 +378,7 @@ class DiskIoMgr : public CacheLineAligned {
/// the IoMgr.
class ScanRange : public RequestRange {
public:
- /// The initial queue capacity for this. Specify -1 to use IoMgr default.
- ScanRange(int initial_capacity = -1);
+ ScanRange();
virtual ~ScanRange();
@@ -401,7 +396,6 @@ class DiskIoMgr : public CacheLineAligned {
void* meta_data() const { return meta_data_; }
bool try_cache() const { return try_cache_; }
bool expected_local() const { return expected_local_; }
- int ready_buffers_capacity() const { return ready_buffers_capacity_; }
/// Returns the next buffer for this scan range. buffer is an output parameter.
/// This function blocks until a buffer is ready or an error occurred. If this is
@@ -431,8 +425,10 @@ class DiskIoMgr : public CacheLineAligned {
/// Enqueues a buffer for this range. This does not block.
/// Returns true if this scan range has hit the queue capacity, false otherwise.
/// The caller passes ownership of buffer to the scan range and it is not
- /// valid to access buffer after this call.
- bool EnqueueBuffer(std::unique_ptr<BufferDescriptor> buffer);
+ /// valid to access buffer after this call. The reader lock must be held by the
+ /// caller.
+ bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+ std::unique_ptr<BufferDescriptor> buffer);
/// Cleanup any queued buffers (i.e. due to cancellation). This cannot
/// be called with any locks taken.
@@ -475,7 +471,9 @@ class DiskIoMgr : public CacheLineAligned {
/// and *read_succeeded to true.
/// If the data is not cached, returns ok() and *read_succeeded is set to false.
/// Returns a non-ok status if it ran into a non-continuable error.
- Status ReadFromCache(bool* read_succeeded) WARN_UNUSED_RESULT;
+ /// The reader lock must be held by the caller.
+ Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
+ bool* read_succeeded) WARN_UNUSED_RESULT;
/// Pointer to caller specified metadata. This is untouched by the io manager
/// and the caller can put whatever auxiliary data in here.
@@ -540,7 +538,9 @@ class DiskIoMgr : public CacheLineAligned {
};
/// Lock protecting fields below.
- /// This lock should not be taken during Open/Read/Close.
+ /// This lock should not be taken during Open()/Read()/Close().
+ /// If DiskIoRequestContext::lock_ and this lock need to be held simultaneously,
+ /// DiskIoRequestContext::lock_ must be taken first.
boost::mutex lock_;
/// Number of bytes read so far for this scan range
@@ -566,12 +566,6 @@ class DiskIoMgr : public CacheLineAligned {
boost::condition_variable buffer_ready_cv_;
std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
- /// The soft capacity limit for ready_buffers_. ready_buffers_ can exceed
- /// the limit temporarily as the capacity is adjusted dynamically.
- /// In that case, the capcity is only realized when the caller removes buffers
- /// from ready_buffers_.
- int ready_buffers_capacity_;
-
/// Lock that should be taken during hdfs calls. Only one thread (the disk reading
/// thread) calls into hdfs at a time so this lock does not have performance impact.
/// This lock only serves to coordinate cleanup. Specifically it serves to ensure
@@ -799,9 +793,24 @@ class DiskIoMgr : public CacheLineAligned {
/// 'bytes_to_free' is -1.
void GcIoBuffers(int64_t bytes_to_free = -1);
- /// Default ready buffer queue capacity. This constant doesn't matter too much
- /// since the system dynamically adjusts.
- static const int DEFAULT_QUEUE_CAPACITY;
+ /// The maximum number of ready buffers that can be queued in a scan range. Having two
+ /// queued buffers (plus the buffer that is returned to the client) gives good
+ /// performance in most scenarios:
+ /// 1. If the consumer is consuming data faster than we can read from disk, then the
+ /// queue will be empty most of the time because the buffer will be immediately
+ /// pulled off the queue as soon as it is added. There will always be an I/O request
+ /// in the disk queue to maximize I/O throughput, which is the bottleneck in this
+ /// case.
+ /// 2. If we can read from disk faster than the consumer is consuming data, the queue
+ /// will fill up and there will always be a buffer available for the consumer to
+ /// read, so the consumer will not block and we maximize consumer throughput, which
+ /// is the bottleneck in this case.
+ /// 3. If the consumer is consuming data at approximately the same rate as we are
+ /// reading from disk, then the steady state is that the consumer is processing one
+ /// buffer and one buffer is in the disk queue. The additional buffer can absorb
+ /// bursts where the producer runs faster than the consumer or the consumer runs
+ /// faster than the producer without blocking either the producer or consumer.
+ static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
/// "Disk" queue offsets for remote accesses. Offset 0 corresponds to
/// disk ID (i.e. disk_queue_ index) of num_local_disks().