You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:42:04 UTC
[13/15] impala git commit: IMPALA-4835: switch I/O buffers to buffer
pool
http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index a16d06e..95b2d03 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,13 +22,17 @@
#include "codegen/llvm-codegen.h"
#include "common/init.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/io/local-file-system-with-fault-injection.h"
-#include "runtime/io/request-context.h"
#include "runtime/io/disk-io-mgr-stress.h"
#include "runtime/io/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
+#include "runtime/io/request-context.h"
+#include "runtime/test-env.h"
#include "runtime/thread-resource-mgr.h"
+#include "service/fe-support.h"
#include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/disk-info.h"
@@ -36,13 +40,20 @@
#include "common/names.h"
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+
+DECLARE_int64(min_buffer_size);
DECLARE_int32(num_remote_hdfs_io_threads);
DECLARE_int32(num_s3_io_threads);
DECLARE_int32(num_adls_io_threads);
-const int MIN_BUFFER_SIZE = 512;
+const int MIN_BUFFER_SIZE = 128;
const int MAX_BUFFER_SIZE = 1024;
-const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
+const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L;
+const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L;
+const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT;
namespace impala {
namespace io {
@@ -50,14 +61,41 @@ namespace io {
class DiskIoMgrTest : public testing::Test {
public:
- virtual void SetUp() {}
+ virtual void SetUp() {
+ test_env_.reset(new TestEnv);
+ // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+ test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY);
+ ASSERT_OK(test_env_->Init());
+ RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_);
+ }
virtual void TearDown() {
+ root_reservation_.Close();
pool_.Clear();
+ test_env_.reset();
+ }
+
+ /// Initialises 'root_reservation_'. The reservation is automatically closed in
+ /// TearDown().
+ void InitRootReservation(int64_t reservation_limit) {
+ root_reservation_.InitRootTracker(
+ RuntimeProfile::Create(&pool_, "root"), reservation_limit);
}
+
+ /// Initialise 'client' with the given reservation limit. The client reservation is a
+ /// child of 'root_reservation_'.
+ void RegisterBufferPoolClient(int64_t reservation_limit, int64_t initial_reservation,
+ BufferPool::ClientHandle* client) {
+ ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, &root_reservation_, nullptr,
+ reservation_limit, RuntimeProfile::Create(&pool_, ""), client));
+ if (initial_reservation > 0) {
+ ASSERT_TRUE(client->IncreaseReservation(initial_reservation));
+ }
+ }
+
void WriteValidateCallback(int num_writes, WriteRange** written_range,
- DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data,
- Status expected_status, const Status& status) {
+ DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* client,
+ int32_t* data, Status expected_status, const Status& status) {
if (expected_status.code() == TErrorCode::CANCELLED) {
EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
} else {
@@ -67,8 +105,8 @@ class DiskIoMgrTest : public testing::Test {
ScanRange* scan_range = pool_.Add(new ScanRange());
scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
(*written_range)->offset(), 0, false, BufferOpts::Uncached());
- ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
- sizeof(int32_t));
+ ValidateSyncRead(io_mgr, reader, client, scan_range,
+ reinterpret_cast<const char*>(data), sizeof(int32_t));
}
if (!status.ok()) {
@@ -93,9 +131,13 @@ class DiskIoMgrTest : public testing::Test {
protected:
void CreateTempFile(const char* filename, const char* data) {
+ CreateTempFile(filename, data, strlen(data));
+ }
+
+ void CreateTempFile(const char* filename, const char* data, int64_t data_bytes) {
FILE* file = fopen(filename, "w");
EXPECT_TRUE(file != nullptr);
- fwrite(data, 1, strlen(data), file);
+ fwrite(data, 1, data_bytes, file);
fclose(file);
}
@@ -120,15 +162,22 @@ class DiskIoMgrTest : public testing::Test {
}
static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
- ScanRange* range, const char* expected, int expected_len = -1) {
+ BufferPool::ClientHandle* client, ScanRange* range, const char* expected,
+ int expected_len = -1) {
unique_ptr<BufferDescriptor> buffer;
- ASSERT_OK(io_mgr->Read(reader, range, &buffer));
+ bool needs_buffers;
+ ASSERT_OK(io_mgr->StartScanRange(reader, range, &needs_buffers));
+ if (needs_buffers) {
+ ASSERT_OK(io_mgr->AllocateBuffersForRange(
+ reader, client, range, io_mgr->max_buffer_size()));
+ }
+ ASSERT_OK(range->GetNext(&buffer));
ASSERT_TRUE(buffer != nullptr);
EXPECT_EQ(buffer->len(), range->len());
if (expected_len < 0) expected_len = strlen(expected);
int cmp = memcmp(buffer->buffer(), expected, expected_len);
EXPECT_TRUE(cmp == 0);
- io_mgr->ReturnBuffer(move(buffer));
+ range->ReturnBuffer(move(buffer));
}
static void ValidateScanRange(DiskIoMgr* io_mgr, ScanRange* range,
@@ -141,13 +190,13 @@ class DiskIoMgrTest : public testing::Test {
Status status = range->GetNext(&buffer);
ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
if (buffer == nullptr || !status.ok()) {
- if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer));
+ if (buffer != nullptr) range->ReturnBuffer(move(buffer));
break;
}
ASSERT_LE(buffer->len(), expected_len);
memcpy(result + range->offset() + buffer->scan_range_offset(),
buffer->buffer(), buffer->len());
- io_mgr->ReturnBuffer(move(buffer));
+ range->ReturnBuffer(move(buffer));
}
ValidateEmptyOrCorrect(expected, result, expected_len);
}
@@ -155,27 +204,28 @@ class DiskIoMgrTest : public testing::Test {
// Continues pulling scan ranges from the io mgr until they are all done.
// Updates num_ranges_processed with the number of ranges seen by this thread.
static void ScanRangeThread(DiskIoMgr* io_mgr, RequestContext* reader,
- const char* expected_result, int expected_len, const Status& expected_status,
- int max_ranges, AtomicInt32* num_ranges_processed) {
+ BufferPool::ClientHandle* client, const char* expected_result, int expected_len,
+ const Status& expected_status, int max_ranges, AtomicInt32* num_ranges_processed) {
int num_ranges = 0;
while (max_ranges == 0 || num_ranges < max_ranges) {
ScanRange* range;
- Status status = io_mgr->GetNextRange(reader, &range);
+ bool needs_buffers;
+ Status status = io_mgr->GetNextUnstartedRange(reader, &range, &needs_buffers);
ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
if (range == nullptr) break;
+ if (needs_buffers) {
+ ASSERT_OK(io_mgr->AllocateBuffersForRange(
+ reader, client, range, io_mgr->max_buffer_size() * 3));
+ }
ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status);
num_ranges_processed->Add(1);
++num_ranges;
}
}
- ScanRange* AllocateRange() {
- return pool_.Add(new ScanRange);
- }
-
- ScanRange* InitRange(const char* file_path, int offset, int len,
+ ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
- ScanRange* range = AllocateRange();
+ ScanRange* range = pool->Add(new ScanRange);
range->Reset(nullptr, file_path, len, offset, disk_id, true,
BufferOpts(is_cached, mtime), meta_data);
EXPECT_EQ(mtime, range->mtime());
@@ -196,8 +246,18 @@ class DiskIoMgrTest : public testing::Test {
const string& tmp_file, int offset, RequestContext* writer,
const string& expected_output);
+ /// Convenience function to get a reference to the buffer pool.
+ BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
+
+ boost::scoped_ptr<TestEnv> test_env_;
+
+ /// Per-test random number generator. Seeded before every test.
+ mt19937 rng_;
+
ObjectPool pool_;
+ ReservationTracker root_reservation_;
+
mutex written_mutex_;
ConditionVariable writes_done_;
int num_ranges_written_;
@@ -208,7 +268,7 @@ class DiskIoMgrTest : public testing::Test {
// by reading the data back via a separate IoMgr instance. All writes are expected to
// complete successfully.
TEST_F(DiskIoMgrTest, SingleWriter) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
num_ranges_written_ = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
@@ -222,25 +282,27 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
}
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
- MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
- ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
- unique_ptr<RequestContext> reader =
- read_io_mgr->RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ ASSERT_OK(read_io_mgr->Init());
+ unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
for (int i = 0; i < num_ranges; ++i) {
- int32_t* data = pool_.Add(new int32_t);
+ int32_t* data = tmp_pool.Add(new int32_t);
*data = rand();
- WriteRange** new_range = pool_.Add(new WriteRange*);
- WriteRange::WriteDoneCallback callback =
- bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
- new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1);
- *new_range = pool_.Add(new WriteRange(
- tmp_file, cur_offset, num_ranges % num_disks, callback));
+ WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback = bind(
+ mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, new_range,
+ read_io_mgr.get(), reader.get(), &read_client, data, Status::OK(), _1);
+ *new_range = tmp_pool.Add(
+ new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
cur_offset += sizeof(int32_t);
@@ -256,19 +318,19 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
}
read_io_mgr->UnregisterContext(reader.get());
- read_io_mgr.reset();
+ buffer_pool()->DeregisterClient(&read_client);
}
// Perform invalid writes (e.g. file in non-existent directory, negative offset) and
// validate that an error status is returned via the write callback.
TEST_F(DiskIoMgrTest, InvalidWrite) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
int num_of_writes = 2;
num_ranges_written_ = 0;
string tmp_file = "/non-existent/file.txt";
DiskIoMgr io_mgr(1, 1, 1, 1, 10);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
int32_t* data = pool_.Add(new int32_t);
*data = rand();
@@ -276,7 +338,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
WriteRange** new_range = pool_.Add(new WriteRange*);
WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes, new_range,
- nullptr, nullptr, data,
+ nullptr, nullptr, nullptr, data,
Status(TErrorCode::DISK_IO_ERROR, "open() failed for /non-existent/file.txt. "
"The given path doesn't exist. errno=2"), _1);
*new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback));
@@ -294,7 +356,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
new_range = pool_.Add(new WriteRange*);
callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes,
- new_range, nullptr, nullptr, data,
+ new_range, nullptr, nullptr, nullptr, data,
Status(TErrorCode::DISK_IO_ERROR, "fseek() failed for /tmp/disk_io_mgr_test.txt. "
"Invalid inputs. errno=22, offset=-1"), _1);
@@ -313,9 +375,9 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
// Tests the error messages when some of the disk I/O related low level function fails in
// DiskIoMgr::Write()
TEST_F(DiskIoMgrTest, WriteErrors) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
DiskIoMgr io_mgr(1, 1, 1, 1, 10);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
+ ASSERT_OK(io_mgr.Init());
string file_name = "/tmp/disk_io_mgr_test.txt";
// Fail open()
@@ -354,7 +416,7 @@ void DiskIoMgrTest::ExecuteWriteFailureTest(DiskIoMgr* io_mgr, const string& fil
const string& function_name, int err_no, const string& expected_error) {
int num_of_writes = 1;
num_ranges_written_ = 0;
- unique_ptr<RequestContext> writer = io_mgr->RegisterContext(nullptr);
+ unique_ptr<RequestContext> writer = io_mgr->RegisterContext();
int32_t data = rand();
int success = CreateTempFile(file_name.c_str(), 100);
if (success != 0) {
@@ -383,7 +445,7 @@ void DiskIoMgrTest::AddWriteRange(DiskIoMgr* io_mgr, int num_of_writes, int32_t*
const string& expected_output) {
WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes,
- nullptr, nullptr, nullptr, data,
+ nullptr, nullptr, nullptr, nullptr, data,
Status(TErrorCode::DISK_IO_ERROR, expected_output), _1);
WriteRange* write_range = pool_.Add(new WriteRange(file_name, offset, 0, callback));
write_range->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -394,7 +456,7 @@ void DiskIoMgrTest::AddWriteRange(DiskIoMgr* io_mgr, int num_of_writes, int32_t*
// AddWriteRange() is expected to succeed before the cancel and fail after it.
// The writes themselves may finish with status cancelled or ok.
TEST_F(DiskIoMgrTest, SingleWriterCancel) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
num_ranges_written_ = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
@@ -409,30 +471,33 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
}
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
- MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
- ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
- unique_ptr<RequestContext> reader =
- read_io_mgr->RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ ASSERT_OK(read_io_mgr->Init());
+ unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
Status validate_status = Status::OK();
for (int i = 0; i < num_ranges; ++i) {
if (i == num_ranges_before_cancel) {
- io_mgr.CancelContext(writer.get());
+ writer->Cancel();
validate_status = Status::CANCELLED;
}
- int32_t* data = pool_.Add(new int32_t);
+ int32_t* data = tmp_pool.Add(new int32_t);
*data = rand();
- WriteRange** new_range = pool_.Add(new WriteRange*);
- WriteRange::WriteDoneCallback callback = bind(
- mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel,
- new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1);
- *new_range = pool_.Add(new WriteRange(
- tmp_file, cur_offset, num_ranges % num_disks, callback));
+ WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback =
+ bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
+ num_ranges_before_cancel, new_range, read_io_mgr.get(), reader.get(),
+ &read_client, data, Status::CANCELLED, _1);
+ *new_range = tmp_pool.Add(
+ new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
cur_offset += sizeof(int32_t);
Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range);
@@ -449,13 +514,13 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
}
read_io_mgr->UnregisterContext(reader.get());
- read_io_mgr.reset();
+ buffer_pool()->DeregisterClient(&read_client);
}
// Basic test with a single reader, testing multiple threads, disks and a different
// number of buffers.
TEST_F(DiskIoMgrTest, SingleReader) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -469,7 +534,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
- ObjectPool pool;
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks
<< " num_read_threads=" << num_read_threads;
@@ -477,38 +542,40 @@ TEST_F(DiskIoMgrTest, SingleReader) {
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader =
- io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < num_read_threads; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len,
- Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// This test issues adding additional scan ranges while there are some still in flight.
TEST_F(DiskIoMgrTest, AddScanRangeTest) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -521,17 +588,19 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader =
- io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges_first_half;
vector<ScanRange*> ranges_second_half;
@@ -539,10 +608,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
int disk_id = i % num_disks;
if (i > len / 2) {
ranges_second_half.push_back(
- InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
} else {
ranges_first_half.push_back(
- InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
}
}
AtomicInt32 num_ranges_processed;
@@ -551,8 +620,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half));
// Read a couple of them
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2,
- &num_ranges_processed);
+ ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+ Status::OK(), 2, &num_ranges_processed);
// Issue second half
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half));
@@ -560,24 +629,26 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ threads.add_thread(
+ new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), len);
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test to make sure that sync reads and async reads work together
// Note: this test is constructed so the number of buffers is greater than the
// number of scan ranges.
TEST_F(DiskIoMgrTest, SyncReadTest) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -590,7 +661,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
@@ -598,54 +670,56 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader =
- io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
ScanRange* complete_range =
- InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
+ InitRange(&tmp_pool, tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
// Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(
+ InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, strlen(data), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
}
threads.join_all();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Tests a single reader cancelling half way through scan ranges.
TEST_F(DiskIoMgrTest, SingleReaderCancel) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -658,22 +732,25 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader =
- io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(
+ InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
@@ -681,94 +758,112 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
int num_succesful_ranges = ranges.size() / 2;
// Read half the ranges
for (int i = 0; i < num_succesful_ranges; ++i) {
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1,
- &num_ranges_processed);
+ ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+ Status::OK(), 1, &num_ranges_processed);
}
EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, strlen(data), Status::CANCELLED, 0,
+ &num_ranges_processed));
}
- io_mgr.CancelContext(reader.get());
+ reader->Cancel();
sched_yield();
threads.join_all();
- EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled());
+ EXPECT_TRUE(reader->IsCancelled());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
-// Test when the reader goes over the mem limit
-TEST_F(DiskIoMgrTest, MemLimits) {
+// Test readers running with different amounts of memory and getting blocked on scan
+// ranges that have run out of buffers.
+TEST_F(DiskIoMgrTest, MemScarcity) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
- const char* data = "abcdefghijklm";
- int len = strlen(data);
- CreateTempFile(tmp_file, data);
+ // File is 2.5 max buffers so that we can scan file without returning buffers
+ // when we get the max reservation below.
+ const int64_t DATA_BYTES = MAX_BUFFER_SIZE * 5 / 2;
+ char data[DATA_BYTES];
+ for (int i = 0; i < DATA_BYTES; ++i) {
+ data[i] = uniform_int_distribution<uint8_t>(0, 255)(rng_);
+ }
+ CreateTempFile(tmp_file, data, DATA_BYTES);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
- const int mem_limit_num_buffers = 2;
- // Allocate enough ranges so that the total buffers exceeds the mem limit.
+ const int RESERVATION_LIMIT_NUM_BUFFERS = 20;
+ const int64_t RESERVATION_LIMIT = RESERVATION_LIMIT_NUM_BUFFERS * MAX_BUFFER_SIZE;
+ InitRootReservation(RESERVATION_LIMIT);
+
+ thread_group threads;
+ // Allocate enough ranges so that the total buffers exceeds the limit.
const int num_ranges = 25;
{
- MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
- ASSERT_OK(io_mgr.Init(&root_mem_tracker));
- MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(RESERVATION_LIMIT, RESERVATION_LIMIT, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < num_ranges; ++i) {
- ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+ ranges.push_back(InitRange(&pool_, tmp_file, 0, DATA_BYTES, 0, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
-
- // Don't return buffers to force memory pressure
- vector<unique_ptr<BufferDescriptor>> buffers;
-
- AtomicInt32 num_ranges_processed;
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
- 1, &num_ranges_processed);
-
- char result[strlen(data) + 1];
- // Keep reading new ranges without returning buffers. This forces us
- // to go over the limit eventually.
- while (true) {
- memset(result, 0, strlen(data) + 1);
+ // Keep starting new ranges without returning buffers until we run out of
+ // reservation.
+ while (read_client.GetUnusedReservation() >= MIN_BUFFER_SIZE) {
ScanRange* range = nullptr;
- Status status = io_mgr.GetNextRange(reader.get(), &range);
- ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+ bool needs_buffers;
+ ASSERT_OK(io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers));
if (range == nullptr) break;
-
- while (true) {
- unique_ptr<BufferDescriptor> buffer;
- Status status = range->GetNext(&buffer);
- ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
- if (buffer == nullptr) break;
- memcpy(result + range->offset() + buffer->scan_range_offset(),
- buffer->buffer(), buffer->len());
- buffers.push_back(move(buffer));
- }
- ValidateEmptyOrCorrect(data, result, strlen(data));
- }
-
- for (int i = 0; i < buffers.size(); ++i) {
- io_mgr.ReturnBuffer(move(buffers[i]));
+ ASSERT_TRUE(needs_buffers);
+ // Pick a random amount of memory to reserve.
+ int64_t max_bytes_to_alloc = uniform_int_distribution<int64_t>(MIN_BUFFER_SIZE,
+ min<int64_t>(read_client.GetUnusedReservation(), MAX_BUFFER_SIZE * 3))(rng_);
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(
+ reader.get(), &read_client, range, max_bytes_to_alloc));
+ // Start a thread fetching from the range. The thread will either finish the
+ // range or be cancelled.
+ threads.add_thread(new thread([&data, DATA_BYTES, range] {
+ // Don't return buffers to force memory pressure.
+ vector<unique_ptr<BufferDescriptor>> buffers;
+ int64_t data_offset = 0;
+ Status status;
+ while (true) {
+ unique_ptr<BufferDescriptor> buffer;
+ status = range->GetNext(&buffer);
+ ASSERT_TRUE(status.ok() || status.IsCancelled()) << status.GetDetail();
+ if (status.IsCancelled() || buffer == nullptr) break;
+ EXPECT_EQ(0, memcmp(data + data_offset, buffer->buffer(), buffer->len()));
+ data_offset += buffer->len();
+ buffers.emplace_back(move(buffer));
+ }
+ if (status.ok()) ASSERT_EQ(DATA_BYTES, data_offset);
+ for (auto& buffer : buffers) range->ReturnBuffer(move(buffer));
+ }));
+ // Let the thread start running before starting the next.
+ SleepForMs(10);
}
-
- EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded());
+ // Let the threads run for a bit then cancel everything.
+ SleepForMs(500);
+ reader->Cancel();
+ // Wait until the threads have returned their buffers before unregistering.
+ threads.join_all();
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
@@ -777,7 +872,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
// only tests the fallback mechanism.
// TODO: we can fake the cached read path without HDFS
TEST_F(DiskIoMgrTest, CachedReads) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -791,52 +886,55 @@ TEST_F(DiskIoMgrTest, CachedReads) {
{
DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
ScanRange* complete_range =
- InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+ InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
// Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(
- InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+ InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
+ data, strlen(data), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
}
threads.join_all();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int ITERATIONS = 1;
const char* data = "abcdefghijklmnopqrstuvwxyz";
const int num_contexts = 5;
@@ -858,17 +956,22 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
int64_t iters = 0;
vector<unique_ptr<RequestContext>> contexts(num_contexts);
+ unique_ptr<BufferPool::ClientHandle[]> clients(
+ new BufferPool::ClientHandle[num_contexts]);
Status status;
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
MAX_BUFFER_SIZE);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
+ ASSERT_OK(io_mgr.Init());
for (int file_index = 0; file_index < num_contexts; ++file_index) {
- contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[file_index]);
+ contexts[file_index] = io_mgr.RegisterContext();
}
- pool_.Clear();
int read_offset = 0;
int write_offset = 0;
while (read_offset < file_size) {
@@ -880,11 +983,11 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
for (int i = 0; i < num_scan_ranges; ++i) {
ranges.push_back(InitRange(
- file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
- threads.add_thread(
- new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(),
- reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
- 1, Status::OK(), num_scan_ranges, &num_ranges_processed));
+ &tmp_pool, file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr,
+ contexts[context_index].get(), &clients[context_index],
+ reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
+ 1, Status::OK(), num_scan_ranges, &num_ranges_processed));
++read_offset;
}
@@ -894,7 +997,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
this, num_write_ranges, _1);
- WriteRange* new_range = pool_.Add(new WriteRange(
+ WriteRange* new_range = tmp_pool.Add(new WriteRange(
file_name, write_offset, i % num_disks, callback));
new_range->SetData(
reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))),
@@ -911,9 +1014,9 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
threads.join_all();
} // for (int context_index
} // while (read_offset < file_size)
-
for (int file_index = 0; file_index < num_contexts; ++file_index) {
io_mgr.UnregisterContext(contexts[file_index].get());
+ buffer_pool()->DeregisterClient(&clients[file_index]);
}
} // for (int num_disks
} // for (int threads_per_disk
@@ -922,23 +1025,19 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
// This test will test multiple concurrent reads each reading a different file.
TEST_F(DiskIoMgrTest, MultipleReader) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int NUM_READERS = 5;
const int DATA_LEN = 50;
const int ITERATIONS = 25;
const int NUM_THREADS_PER_READER = 3;
- vector<string> file_names;
- vector<int64_t> mtimes;
- vector<string> data;
- vector<unique_ptr<RequestContext>> readers;
- vector<char*> results;
-
- file_names.resize(NUM_READERS);
- readers.resize(NUM_READERS);
- mtimes.resize(NUM_READERS);
- data.resize(NUM_READERS);
- results.resize(NUM_READERS);
+ vector<string> file_names(NUM_READERS);
+ vector<int64_t> mtimes(NUM_READERS);
+ vector<string> data(NUM_READERS);
+ unique_ptr<BufferPool::ClientHandle[]> clients(
+ new BufferPool::ClientHandle[NUM_READERS]);
+ vector<unique_ptr<RequestContext>> readers(NUM_READERS);
+ vector<char*> results(NUM_READERS);
// Initialize data for each reader. The data will be
// 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
@@ -969,22 +1068,25 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
<< " num_disk=" << num_disks;
if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
MAX_BUFFER_SIZE);
- EXPECT_OK(io_mgr.Init(&mem_tracker));
+ ASSERT_OK(io_mgr.Init());
for (int i = 0; i < NUM_READERS; ++i) {
- readers[i] = io_mgr.RegisterContext(&mem_tracker);
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[i]);
+ readers[i] = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int j = 0; j < DATA_LEN; ++j) {
int disk_id = j % num_disks;
- ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
+ ranges.push_back(InitRange(&tmp_pool, file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
}
ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges));
}
@@ -994,18 +1096,20 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
for (int i = 0; i < NUM_READERS; ++i) {
for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(),
- data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed));
+ &clients[i], data[i].c_str(), data[i].size(), Status::OK(), 0,
+ &num_ranges_processed));
}
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
for (int i = 0; i < NUM_READERS; ++i) {
io_mgr.UnregisterContext(readers[i].get());
+ buffer_pool()->DeregisterClient(&clients[i]);
}
}
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Stress test for multiple clients with cancellation
@@ -1017,115 +1121,148 @@ TEST_F(DiskIoMgrTest, StressTest) {
test.Run(2); // In seconds
}
-TEST_F(DiskIoMgrTest, Buffers) {
- // Test default min/max buffer size
- int min_buffer_size = 1024;
- int max_buffer_size = 8 * 1024 * 1024; // 8 MB
- MemTracker root_mem_tracker(max_buffer_size * 2);
-
- DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
- ASSERT_OK(io_mgr.Init(&root_mem_tracker));
- ASSERT_EQ(root_mem_tracker.consumption(), 0);
-
- MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
- unique_ptr<RequestContext> reader;
- reader = io_mgr.RegisterContext(&reader_mem_tracker);
-
- ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
-
- // buffer length should be rounded up to min buffer size
- int64_t buffer_len = 1;
- unique_ptr<BufferDescriptor> buffer_desc;
- buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
- EXPECT_TRUE(buffer_desc->buffer() != nullptr);
- EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
- EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
- io_mgr.FreeBufferMemory(buffer_desc.get());
- io_mgr.ReturnBuffer(move(buffer_desc));
- EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
- // reuse buffer
- buffer_len = min_buffer_size;
- buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
- EXPECT_TRUE(buffer_desc->buffer() != nullptr);
- EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
- EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
- io_mgr.FreeBufferMemory(buffer_desc.get());
- io_mgr.ReturnBuffer(move(buffer_desc));
- EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
- // bump up to next buffer size
- buffer_len = min_buffer_size + 1;
- buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
- EXPECT_TRUE(buffer_desc->buffer() != nullptr);
- EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
- EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
- EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
-
- // gc unused buffer
- io_mgr.GcIoBuffers();
- EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
- EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
-
- io_mgr.FreeBufferMemory(buffer_desc.get());
- io_mgr.ReturnBuffer(move(buffer_desc));
-
- // max buffer size
- buffer_len = max_buffer_size;
- buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
- EXPECT_TRUE(buffer_desc->buffer() != nullptr);
- EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
- EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
- io_mgr.FreeBufferMemory(buffer_desc.get());
- io_mgr.ReturnBuffer(move(buffer_desc));
- EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, root_mem_tracker.consumption());
-
- // gc buffers
- io_mgr.GcIoBuffers();
- EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
- EXPECT_EQ(root_mem_tracker.consumption(), 0);
- io_mgr.UnregisterContext(reader.get());
-}
-
// IMPALA-2366: handle partial read where range goes past end of file.
TEST_F(DiskIoMgrTest, PartialRead) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
int read_len = len + 1000; // Read past end of file.
+ // Test with various buffer sizes to exercise different code paths, e.g.
+ // * the truncated data ends exactly on a buffer boundary
+ // * the data is split between many buffers
+ // * the data fits in one buffer
+ const int64_t MIN_BUFFER_SIZE = 2;
+ vector<int64_t> max_buffer_sizes{4, 16, 32, 128, 1024, 4096};
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
- scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+ for (int64_t max_buffer_size : max_buffer_sizes) {
+ DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, max_buffer_size);
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> reader;
+ reader = io_mgr.RegisterContext();
+
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+
+ // We should not read past the end of file.
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, read_len, 0, stat_val.st_mtime);
+ unique_ptr<BufferDescriptor> buffer;
+ bool needs_buffers;
+ ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
+ if (needs_buffers) {
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(
+ reader.get(), &read_client, range, 3 * max_buffer_size));
+ }
+
+ int64_t bytes_read = 0;
+ bool eosr = false;
+ do {
+ ASSERT_OK(range->GetNext(&buffer));
+ ASSERT_GE(buffer->buffer_len(), MIN_BUFFER_SIZE);
+ ASSERT_LE(buffer->buffer_len(), max_buffer_size);
+ ASSERT_LE(buffer->len(), len - bytes_read);
+ ASSERT_TRUE(memcmp(buffer->buffer(), data + bytes_read, buffer->len()) == 0);
+ bytes_read += buffer->len();
+ eosr = buffer->eosr();
+ // Should see eosr if we've read past the end of the file. If the data is an exact
+ // multiple of the max buffer size then we may read to the end of the file without
+ // noticing that it is eosr. Eosr will be returned on the next read in that case.
+ ASSERT_TRUE(bytes_read < len || buffer->eosr()
+ || (buffer->len() == max_buffer_size && len % max_buffer_size == 0))
+ << "max_buffer_size " << max_buffer_size << " bytes_read " << bytes_read
+ << "len " << len << " buffer->len() " << buffer->len()
+ << " buffer->buffer_len() " << buffer->buffer_len();
+ ASSERT_TRUE(buffer->len() > 0 || buffer->eosr());
+ range->ReturnBuffer(move(buffer));
+ } while (!eosr);
+
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
+ }
+}
+
+// Test zero-length scan range.
+TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ const int64_t MIN_BUFFER_SIZE = 2;
+ const int64_t MAX_BUFFER_SIZE = 1024;
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
- ASSERT_OK(io_mgr->Init(&mem_tracker));
+ DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, 0, 0, stat_val.st_mtime);
+ bool needs_buffers;
+ Status status = io_mgr.StartScanRange(reader.get(), range, &needs_buffers);
+ ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
+
+ status = io_mgr.AddScanRanges(reader.get(), vector<ScanRange*>({range}));
+ ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
+
+ io_mgr.UnregisterContext(reader.get());
+}
+
+// Test what happens if don't call AllocateBuffersForRange() after trying to start a
+// range.
+TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ int len = strlen(data);
+ const int64_t MIN_BUFFER_SIZE = 2;
+ const int64_t MAX_BUFFER_SIZE = 1024;
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+ ASSERT_OK(io_mgr.Init());
MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader;
- reader = io_mgr->RegisterContext(&reader_mem_tracker);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
// We should not read past the end of file.
- ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
- unique_ptr<BufferDescriptor> buffer;
- ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer));
- ASSERT_TRUE(buffer->eosr());
- ASSERT_EQ(len, buffer->len());
- ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
- io_mgr->ReturnBuffer(move(buffer));
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < 4; ++i) {
+ ranges.push_back(InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime));
+ }
+ bool needs_buffers;
+ // Test StartScanRange().
+ ASSERT_OK(io_mgr.StartScanRange(reader.get(), ranges[0], &needs_buffers));
+ EXPECT_TRUE(needs_buffers);
+ ASSERT_OK(io_mgr.StartScanRange(reader.get(), ranges[1], &needs_buffers));
+ EXPECT_TRUE(needs_buffers);
+
+ // Test AddScanRanges()/GetNextUnstartedRange().
+ ASSERT_OK(
+ io_mgr.AddScanRanges(reader.get(), vector<ScanRange*>({ranges[2], ranges[3]})));
+
+ // Cancel two directly, cancel the other two indirectly via the context.
+ ranges[0]->Cancel(Status::CANCELLED);
+ ranges[2]->Cancel(Status::CANCELLED);
+ reader->Cancel();
- io_mgr->UnregisterContext(reader.get());
- pool_.Clear();
- io_mgr.reset();
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ io_mgr.UnregisterContext(reader.get());
}
// Test reading into a client-allocated buffer.
TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
@@ -1134,19 +1271,19 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
- ASSERT_OK(io_mgr->Init(&mem_tracker));
- // Reader doesn't need to provide mem tracker if it's providing buffers.
- MemTracker* reader_mem_tracker = nullptr;
- unique_ptr<RequestContext> reader;
- reader = io_mgr->RegisterContext(reader_mem_tracker);
+ ASSERT_OK(io_mgr->Init());
+ // Reader doesn't need to provide client if it's providing buffers.
+ unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
- ScanRange* range = AllocateRange();
+ ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), buffer_len));
- ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+ bool needs_buffers;
+ ASSERT_OK(io_mgr->StartScanRange(reader.get(), range, &needs_buffers));
+ ASSERT_FALSE(needs_buffers);
unique_ptr<BufferDescriptor> io_buffer;
ASSERT_OK(range->GetNext(&io_buffer));
@@ -1156,76 +1293,133 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0);
// DiskIoMgr should not have allocated memory.
- EXPECT_EQ(mem_tracker.consumption(), 0);
- io_mgr->ReturnBuffer(move(io_buffer));
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
+ range->ReturnBuffer(move(io_buffer));
}
io_mgr->UnregisterContext(reader.get());
- pool_.Clear();
- io_mgr.reset();
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test reading into a client-allocated buffer where the read fails.
TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/file/that/does/not/exist";
const int SCAN_LEN = 128;
scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
- ASSERT_OK(io_mgr->Init(&mem_tracker));
- // Reader doesn't need to provide mem tracker if it's providing buffers.
- MemTracker* reader_mem_tracker = nullptr;
- unique_ptr<RequestContext> reader;
+ ASSERT_OK(io_mgr->Init());
vector<uint8_t> client_buffer(SCAN_LEN);
for (int i = 0; i < 1000; ++i) {
- reader = io_mgr->RegisterContext(reader_mem_tracker);
- ScanRange* range = AllocateRange();
+ // Reader doesn't need to provide mem tracker if it's providing buffers.
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+ ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
- ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+ bool needs_buffers;
+ ASSERT_OK(io_mgr->StartScanRange(reader.get(), range, &needs_buffers));
+ ASSERT_FALSE(needs_buffers);
/// Also test the cancellation path. Run multiple iterations since it is racy whether
/// the read fails before the cancellation.
- if (i >= 1) io_mgr->CancelContext(reader.get());
+ if (i >= 1) reader->Cancel();
unique_ptr<BufferDescriptor> io_buffer;
ASSERT_FALSE(range->GetNext(&io_buffer).ok());
// DiskIoMgr should not have allocated memory.
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
io_mgr->UnregisterContext(reader.get());
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
- pool_.Clear();
- io_mgr.reset();
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test to verify configuration parameters for number of I/O threads per disk.
TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
+ FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
// Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
// Since we do not have control over which disk is used, we check for either type
// (rotational/solid state)
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
const int num_io_threads_per_rotational_or_ssd = 2;
DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
num_io_threads_per_rotational_or_ssd, 1, 10);
- ASSERT_OK(io_mgr.Init(&mem_tracker));
+ ASSERT_OK(io_mgr.Init());
const int num_io_threads = io_mgr.disk_thread_group_.Size();
ASSERT_TRUE(num_io_threads ==
num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
}
+
+// Test to verify that the correct buffer sizes are chosen given different
+// of scan range lengths and max_bytes values.
+TEST_F(DiskIoMgrTest, BufferSizeSelection) {
+ DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+ ASSERT_OK(io_mgr.Init());
+
+ // Scan range doesn't fit in max_bytes - allocate as many max-sized buffers as possible.
+ EXPECT_EQ(vector<int64_t>(3, MAX_BUFFER_SIZE),
+ io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>(4, MAX_BUFFER_SIZE),
+ io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, 4 * MAX_BUFFER_SIZE));
+
+ // Scan range fits in max_bytes - allocate as many max-sized buffers as possible, then
+ // a smaller buffer to fit the remainder.
+ EXPECT_EQ(vector<int64_t>(2, MAX_BUFFER_SIZE),
+ io_mgr.ChooseBufferSizes(2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(2 * MAX_BUFFER_SIZE + 1, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, 2 * MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(
+ 2 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE + 1, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, 2 * MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(
+ 2 * MAX_BUFFER_SIZE + 2 * MIN_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+
+ // Scan range is smaller than max buffer size - allocate a single buffer that fits
+ // the range.
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE - 1, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE - 1, MAX_BUFFER_SIZE / 2));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2 - 1, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2- 1, MAX_BUFFER_SIZE / 2));
+ EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+ EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE, MIN_BUFFER_SIZE));
+
+ // Scan range is smaller than max buffer size and max bytes is smaller still -
+ // should allocate a single smaller buffer.
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 4}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2, MAX_BUFFER_SIZE / 2 - 1));
+
+ // Non power-of-two size > max buffer size.
+ EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE + 7, 3 * MAX_BUFFER_SIZE));
+ // Non power-of-two size < min buffer size.
+ EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+ io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE - 7, 3 * MAX_BUFFER_SIZE));
+}
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport();
return RUN_ALL_TESTS();
}