You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 22:51:32 UTC
[05/15] impala git commit: IMPALA-4835: Part 3: switch I/O buffers to
buffer pool
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index ba1ad92..3fd33de 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,6 +19,8 @@
#include "runtime/io/disk-io-mgr-stress.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
#include "runtime/io/request-context.h"
#include "util/time.h"
@@ -27,21 +29,20 @@
using namespace impala;
using namespace impala::io;
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
+constexpr float DiskIoMgrStress::ABORT_CHANCE;
+const int DiskIoMgrStress::MIN_READ_LEN;
+const int DiskIoMgrStress::MAX_READ_LEN;
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
+const int DiskIoMgrStress::MIN_FILE_LEN;
+const int DiskIoMgrStress::MAX_FILE_LEN;
// Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
+const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
+const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
-// Maximum bytes to allocate per scan range.
-static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
-static const int CANCEL_READER_PERIOD_MS = 20; // in ms
+const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
static void CreateTempFile(const char* filename, const char* data) {
FILE* file = fopen(filename, "w");
@@ -50,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) {
fclose(file);
}
-string GenerateRandomData() {
+string DiskIoMgrStress::GenerateRandomData() {
int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
stringstream ss;
for (int i = 0; i < rand_len; ++i) {
@@ -62,6 +63,8 @@ string GenerateRandomData() {
struct DiskIoMgrStress::Client {
boost::mutex lock;
+ /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
+ ObjectPool obj_pool;
unique_ptr<RequestContext> reader;
int file_idx;
vector<ScanRange*> scan_ranges;
@@ -95,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
clients_ = new Client[num_clients_];
client_mem_trackers_.resize(num_clients_);
+ buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
for (int i = 0; i < num_clients_; ++i) {
NewClient(i);
}
@@ -119,8 +123,8 @@ void DiskIoMgrStress::ClientThread(int client_id) {
CHECK(status.ok() || status.IsCancelled());
if (range == NULL) break;
if (needs_buffers) {
- status = io_mgr_->AllocateBuffersForRange(
- client->reader.get(), range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
+ status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
+ &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
CHECK(status.ok()) << status.GetDetail();
}
@@ -212,7 +216,13 @@ void DiskIoMgrStress::Run(int sec) {
}
readers_.join_all();
- for (unique_ptr<MemTracker>& mem_tracker : client_mem_trackers_) mem_tracker->Close();
+ for (int i = 0; i < num_clients_; ++i) {
+ if (clients_[i].reader != nullptr) {
+ io_mgr_->UnregisterContext(clients_[i].reader.get());
+ }
+ ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+ client_mem_trackers_[i]->Close();
+ }
mem_tracker_.Close();
}
@@ -234,26 +244,41 @@ void DiskIoMgrStress::NewClient(int i) {
}
}
- for (int i = 0; i < client.scan_ranges.size(); ++i) {
- delete client.scan_ranges[i];
- }
+ // Clean up leftover state from the previous client (if any).
client.scan_ranges.clear();
+ ExecEnv* exec_env = ExecEnv::GetInstance();
+ exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+ if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
+ client.obj_pool.Clear();
int assigned_len = 0;
while (assigned_len < file_len) {
int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
range_len = min(range_len, file_len - assigned_len);
- ScanRange* range = new ScanRange();
+ ScanRange* range = client.obj_pool.Add(new ScanRange);
range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
0, false, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
- if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
- client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
- client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
- Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+ string client_name = Substitute("Client $0", i);
+ client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
+ Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
+ exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
+ numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
+ &buffer_pool_clients_[i]);
+ CHECK(status.ok());
+ // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
+ // progress.
+ CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
+ MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
+ << buffer_pool_clients_[i].DebugString() << "\n"
+ << exec_env->buffer_pool()->DebugString() << "\n"
+ << exec_env->buffer_reservation()->DebugString();
+
+ client.reader = io_mgr_->RegisterContext();
+ status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
CHECK(status.ok());
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index b872694..574b58c 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,8 +22,11 @@
#include <memory>
#include <vector>
#include <boost/scoped_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
@@ -43,15 +46,29 @@ class DiskIoMgrStress {
/// Run the test for 'sec'. If 0, run forever
void Run(int sec);
+ static constexpr float ABORT_CHANCE = .10f;
+ static const int MIN_READ_LEN = 1;
+ static const int MAX_READ_LEN = 20;
+
+ static const int MIN_FILE_LEN = 10;
+ static const int MAX_FILE_LEN = 1024;
+
+ // Make sure this is between MIN/MAX FILE_LEN to test more cases
+ static const int MIN_READ_BUFFER_SIZE = 64;
+ static const int MAX_READ_BUFFER_SIZE = 128;
+
+ // Maximum bytes to allocate per scan range.
+ static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+
+ static const int CANCEL_READER_PERIOD_MS = 20;
private:
struct Client;
struct File {
std::string filename;
- std::string data; // the data in the file, used to validate
+ std::string data; // the data in the file, used to validate
};
-
/// Files used for testing. These are created at startup and recycled
/// during the test
std::vector<File> files_;
@@ -72,6 +89,9 @@ class DiskIoMgrStress {
/// Client MemTrackers, one per client.
std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
+ /// Buffer pool clients, one per client.
+ std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
+
/// If true, tests cancelling readers
bool includes_cancellation_;
@@ -88,6 +108,8 @@ class DiskIoMgrStress {
/// Possibly cancels a random reader.
void CancelRandomReader();
+
+ static std::string GenerateRandomData();
};
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 95ea184..3a0f727 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,14 +22,16 @@
#include "codegen/llvm-codegen.h"
#include "common/init.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/io/disk-io-mgr-stress.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
-#include "runtime/mem-tracker.h"
#include "runtime/test-env.h"
#include "runtime/thread-resource-mgr.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/disk-info.h"
@@ -37,13 +39,20 @@
#include "common/names.h"
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+
+DECLARE_int64(min_buffer_size);
DECLARE_int32(num_remote_hdfs_io_threads);
DECLARE_int32(num_s3_io_threads);
DECLARE_int32(num_adls_io_threads);
const int MIN_BUFFER_SIZE = 128;
const int MAX_BUFFER_SIZE = 1024;
-const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
+const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L;
+const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L;
+const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT;
namespace impala {
namespace io {
@@ -53,15 +62,39 @@ class DiskIoMgrTest : public testing::Test {
virtual void SetUp() {
test_env_.reset(new TestEnv);
+ // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+ test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY);
ASSERT_OK(test_env_->Init());
+ RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_);
}
virtual void TearDown() {
+ root_reservation_.Close();
pool_.Clear();
+ test_env_.reset();
+ }
+
+ /// Initialises 'root_reservation_'. The reservation is automatically closed in
+ /// TearDown().
+ void InitRootReservation(int64_t reservation_limit) {
+ root_reservation_.InitRootTracker(
+ RuntimeProfile::Create(&pool_, "root"), reservation_limit);
+ }
+
+ /// Initialise 'client' with the given reservation limit. The client reservation is a
+ /// child of 'root_reservation_'.
+ void RegisterBufferPoolClient(int64_t reservation_limit, int64_t initial_reservation,
+ BufferPool::ClientHandle* client) {
+ ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, &root_reservation_, nullptr,
+ reservation_limit, RuntimeProfile::Create(&pool_, ""), client));
+ if (initial_reservation > 0) {
+ ASSERT_TRUE(client->IncreaseReservation(initial_reservation));
+ }
}
+
void WriteValidateCallback(int num_writes, WriteRange** written_range,
- DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data,
- Status expected_status, const Status& status) {
+ DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* client,
+ int32_t* data, Status expected_status, const Status& status) {
if (expected_status.code() == TErrorCode::CANCELLED) {
EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
} else {
@@ -71,8 +104,8 @@ class DiskIoMgrTest : public testing::Test {
ScanRange* scan_range = pool_.Add(new ScanRange());
scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
(*written_range)->offset(), 0, false, BufferOpts::Uncached());
- ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
- sizeof(int32_t));
+ ValidateSyncRead(io_mgr, reader, client, scan_range,
+ reinterpret_cast<const char*>(data), sizeof(int32_t));
}
{
@@ -93,9 +126,13 @@ class DiskIoMgrTest : public testing::Test {
protected:
void CreateTempFile(const char* filename, const char* data) {
+ CreateTempFile(filename, data, strlen(data));
+ }
+
+ void CreateTempFile(const char* filename, const char* data, int64_t data_bytes) {
FILE* file = fopen(filename, "w");
EXPECT_TRUE(file != nullptr);
- fwrite(data, 1, strlen(data), file);
+ fwrite(data, 1, data_bytes, file);
fclose(file);
}
@@ -120,13 +157,14 @@ class DiskIoMgrTest : public testing::Test {
}
static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
- ScanRange* range, const char* expected, int expected_len = -1) {
+ BufferPool::ClientHandle* client, ScanRange* range, const char* expected,
+ int expected_len = -1) {
unique_ptr<BufferDescriptor> buffer;
bool needs_buffers;
ASSERT_OK(io_mgr->StartScanRange(reader, range, &needs_buffers));
if (needs_buffers) {
ASSERT_OK(io_mgr->AllocateBuffersForRange(
- reader, range, io_mgr->max_buffer_size()));
+ reader, client, range, io_mgr->max_buffer_size()));
}
ASSERT_OK(range->GetNext(&buffer));
ASSERT_TRUE(buffer != nullptr);
@@ -161,8 +199,8 @@ class DiskIoMgrTest : public testing::Test {
// Continues pulling scan ranges from the io mgr until they are all done.
// Updates num_ranges_processed with the number of ranges seen by this thread.
static void ScanRangeThread(DiskIoMgr* io_mgr, RequestContext* reader,
- const char* expected_result, int expected_len, const Status& expected_status,
- int max_ranges, AtomicInt32* num_ranges_processed) {
+ BufferPool::ClientHandle* client, const char* expected_result, int expected_len,
+ const Status& expected_status, int max_ranges, AtomicInt32* num_ranges_processed) {
int num_ranges = 0;
while (max_ranges == 0 || num_ranges < max_ranges) {
ScanRange* range;
@@ -172,7 +210,7 @@ class DiskIoMgrTest : public testing::Test {
if (range == nullptr) break;
if (needs_buffers) {
ASSERT_OK(io_mgr->AllocateBuffersForRange(
- reader, range, io_mgr->max_buffer_size() * 3));
+ reader, client, range, io_mgr->max_buffer_size() * 3));
}
ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status);
num_ranges_processed->Add(1);
@@ -180,23 +218,27 @@ class DiskIoMgrTest : public testing::Test {
}
}
- ScanRange* AllocateRange() {
- return pool_.Add(new ScanRange);
- }
-
- ScanRange* InitRange(const char* file_path, int offset, int len,
+ ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
- ScanRange* range = AllocateRange();
+ ScanRange* range = pool->Add(new ScanRange);
range->Reset(nullptr, file_path, len, offset, disk_id, true,
BufferOpts(is_cached, mtime), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
+ /// Convenience function to get a reference to the buffer pool.
+ BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
+
boost::scoped_ptr<TestEnv> test_env_;
+ /// Per-test random number generator. Seeded before every test.
+ mt19937 rng_;
+
ObjectPool pool_;
+ ReservationTracker root_reservation_;
+
mutex written_mutex_;
ConditionVariable writes_done_;
int num_ranges_written_;
@@ -207,7 +249,7 @@ class DiskIoMgrTest : public testing::Test {
// by reading the data back via a separate IoMgr instance. All writes are expected to
// complete successfully.
TEST_F(DiskIoMgrTest, SingleWriter) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
num_ranges_written_ = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
@@ -221,24 +263,27 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
}
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
- MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
ASSERT_OK(read_io_mgr->Init());
- unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
+ unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init());
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
for (int i = 0; i < num_ranges; ++i) {
- int32_t* data = pool_.Add(new int32_t);
+ int32_t* data = tmp_pool.Add(new int32_t);
*data = rand();
- WriteRange** new_range = pool_.Add(new WriteRange*);
- WriteRange::WriteDoneCallback callback =
- bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
- new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1);
- *new_range = pool_.Add(new WriteRange(
- tmp_file, cur_offset, num_ranges % num_disks, callback));
+ WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback = bind(
+ mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, new_range,
+ read_io_mgr.get(), reader.get(), &read_client, data, Status::OK(), _1);
+ *new_range = tmp_pool.Add(
+ new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
cur_offset += sizeof(int32_t);
@@ -254,27 +299,26 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
}
read_io_mgr->UnregisterContext(reader.get());
- read_io_mgr.reset();
+ buffer_pool()->DeregisterClient(&read_client);
}
// Perform invalid writes (e.g. file in non-existent directory, negative offset) and
// validate that an error status is returned via the write callback.
TEST_F(DiskIoMgrTest, InvalidWrite) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
num_ranges_written_ = 0;
string tmp_file = "/non-existent/file.txt";
DiskIoMgr io_mgr(1, 1, 1, 1, 10);
ASSERT_OK(io_mgr.Init());
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
int32_t* data = pool_.Add(new int32_t);
*data = rand();
// Write to file in non-existent directory.
WriteRange** new_range = pool_.Add(new WriteRange*);
WriteRange::WriteDoneCallback callback =
- bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
- (DiskIoMgr*)nullptr, (RequestContext*)nullptr, data,
- Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+ bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, nullptr,
+ nullptr, nullptr, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
*new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -289,9 +333,9 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
}
new_range = pool_.Add(new WriteRange*);
- callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
- new_range, (DiskIoMgr*)nullptr, (RequestContext*)nullptr,
- data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+ callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
+ nullptr, nullptr, nullptr, data,
+ Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
*new_range = pool_.Add(new WriteRange(tmp_file, -1, 0, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -309,7 +353,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
// AddWriteRange() is expected to succeed before the cancel and fail after it.
// The writes themselves may finish with status cancelled or ok.
TEST_F(DiskIoMgrTest, SingleWriterCancel) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
num_ranges_written_ = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
@@ -324,29 +368,33 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
}
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
- MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
ASSERT_OK(read_io_mgr->Init());
- unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
+ unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init());
- unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
Status validate_status = Status::OK();
for (int i = 0; i < num_ranges; ++i) {
if (i == num_ranges_before_cancel) {
writer->Cancel();
validate_status = Status::CANCELLED;
}
- int32_t* data = pool_.Add(new int32_t);
+ int32_t* data = tmp_pool.Add(new int32_t);
*data = rand();
- WriteRange** new_range = pool_.Add(new WriteRange*);
- WriteRange::WriteDoneCallback callback = bind(
- mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel,
- new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1);
- *new_range = pool_.Add(new WriteRange(
- tmp_file, cur_offset, num_ranges % num_disks, callback));
+ WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback =
+ bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
+ num_ranges_before_cancel, new_range, read_io_mgr.get(), reader.get(),
+ &read_client, data, Status::CANCELLED, _1);
+ *new_range = tmp_pool.Add(
+ new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
cur_offset += sizeof(int32_t);
Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range);
@@ -363,13 +411,13 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
}
read_io_mgr->UnregisterContext(reader.get());
- read_io_mgr.reset();
+ buffer_pool()->DeregisterClient(&read_client);
}
// Basic test with a single reader, testing multiple threads, disks and a different
// number of buffers.
TEST_F(DiskIoMgrTest, SingleReader) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -383,7 +431,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
- ObjectPool pool;
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks
<< " num_read_threads=" << num_read_threads;
@@ -392,36 +440,39 @@ TEST_F(DiskIoMgrTest, SingleReader) {
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < num_read_threads; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len,
- Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// This test issues adding additional scan ranges while there are some still in flight.
TEST_F(DiskIoMgrTest, AddScanRangeTest) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -434,7 +485,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
@@ -442,8 +494,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges_first_half;
vector<ScanRange*> ranges_second_half;
@@ -451,10 +505,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
int disk_id = i % num_disks;
if (i > len / 2) {
ranges_second_half.push_back(
- InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
} else {
ranges_first_half.push_back(
- InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
}
}
AtomicInt32 num_ranges_processed;
@@ -463,8 +517,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half));
// Read a couple of them
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2,
- &num_ranges_processed);
+ ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+ Status::OK(), 2, &num_ranges_processed);
// Issue second half
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half));
@@ -472,24 +526,26 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ threads.add_thread(
+ new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), len);
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test to make sure that sync reads and async reads work together
// Note: this test is constructed so the number of buffers is greater than the
// number of scan ranges.
TEST_F(DiskIoMgrTest, SyncReadTest) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -502,7 +558,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
@@ -511,50 +568,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+
ScanRange* complete_range =
- InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
+ InitRange(&tmp_pool, tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
// Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(
+ InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, strlen(data), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
}
threads.join_all();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Tests a single reader cancelling half way through scan ranges.
TEST_F(DiskIoMgrTest, SingleReaderCancel) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -567,7 +629,8 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks;
@@ -575,13 +638,16 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
- ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ ranges.push_back(
+ InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
@@ -589,16 +655,17 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
int num_succesful_ranges = ranges.size() / 2;
// Read half the ranges
for (int i = 0; i < num_succesful_ranges; ++i) {
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1,
- &num_ranges_processed);
+ ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+ Status::OK(), 1, &num_ranges_processed);
}
EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+ &read_client, data, strlen(data), Status::CANCELLED, 0,
+ &num_ranges_processed));
}
reader->Cancel();
@@ -607,87 +674,93 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
threads.join_all();
EXPECT_TRUE(reader->IsCancelled());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
-// Test when the reader goes over the mem limit
-TEST_F(DiskIoMgrTest, MemLimits) {
+// Test readers running with different amounts of memory and getting blocked on scan
+// ranges that have run out of buffers.
+TEST_F(DiskIoMgrTest, MemScarcity) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
- const char* data = "abcdefghijklm";
- int len = strlen(data);
- CreateTempFile(tmp_file, data);
+ // File is 2.5 max buffers so that we can scan file without returning buffers
+ // when we get the max reservation below.
+ const int64_t DATA_BYTES = MAX_BUFFER_SIZE * 5 / 2;
+ char data[DATA_BYTES];
+ for (int i = 0; i < DATA_BYTES; ++i) {
+ data[i] = uniform_int_distribution<uint8_t>(0, 255)(rng_);
+ }
+ CreateTempFile(tmp_file, data, DATA_BYTES);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
- const int mem_limit_num_buffers = 2;
- // Allocate enough ranges so that the total buffers exceeds the mem limit.
+ const int RESERVATION_LIMIT_NUM_BUFFERS = 20;
+ const int64_t RESERVATION_LIMIT = RESERVATION_LIMIT_NUM_BUFFERS * MAX_BUFFER_SIZE;
+ InitRootReservation(RESERVATION_LIMIT);
+
+ thread_group threads;
+ // Allocate enough ranges so that the total buffers exceeds the limit.
const int num_ranges = 25;
{
- MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(RESERVATION_LIMIT, RESERVATION_LIMIT, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int i = 0; i < num_ranges; ++i) {
- ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+ ranges.push_back(InitRange(&pool_, tmp_file, 0, DATA_BYTES, 0, stat_val.st_mtime));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
-
- // Don't return buffers to force memory pressure
- vector<pair<ScanRange*, unique_ptr<BufferDescriptor>>> buffers;
-
- AtomicInt32 num_ranges_processed;
- ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
- 1, &num_ranges_processed);
-
- bool hit_mem_limit_exceeded = false;
- char result[strlen(data) + 1];
- // Keep starting new ranges without returning buffers. This forces us to go over
- // the limit eventually.
- while (true) {
- memset(result, 0, strlen(data) + 1);
+ // Keep starting new ranges without returning buffers until we run out of
+ // reservation.
+ while (read_client.GetUnusedReservation() >= MIN_BUFFER_SIZE) {
ScanRange* range = nullptr;
bool needs_buffers;
- Status status = io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers);
- ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
- hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
+ ASSERT_OK(io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers));
if (range == nullptr) break;
- DCHECK(needs_buffers);
- status = io_mgr.AllocateBuffersForRange(reader.get(), range, MAX_BUFFER_SIZE * 3);
- ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
- if (status.IsMemLimitExceeded()) {
- hit_mem_limit_exceeded = true;
- continue;
- }
-
- while (true) {
- unique_ptr<BufferDescriptor> buffer;
- Status status = range->GetNext(&buffer);
- ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
- hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
- if (buffer == nullptr) break;
- memcpy(result + range->offset() + buffer->scan_range_offset(),
- buffer->buffer(), buffer->len());
- buffers.emplace_back(range, move(buffer));
- }
- ValidateEmptyOrCorrect(data, result, strlen(data));
- }
-
- for (int i = 0; i < buffers.size(); ++i) {
- buffers[i].first->ReturnBuffer(move(buffers[i].second));
+ ASSERT_TRUE(needs_buffers);
+ // Pick a random amount of memory to reserve.
+ int64_t max_bytes_to_alloc = uniform_int_distribution<int64_t>(MIN_BUFFER_SIZE,
+ min<int64_t>(read_client.GetUnusedReservation(), MAX_BUFFER_SIZE * 3))(rng_);
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(
+ reader.get(), &read_client, range, max_bytes_to_alloc));
+ // Start a thread fetching from the range. The thread will either finish the
+ // range or be cancelled.
+ threads.add_thread(new thread([&data, DATA_BYTES, range] {
+ // Don't return buffers to force memory pressure.
+ vector<unique_ptr<BufferDescriptor>> buffers;
+ int64_t data_offset = 0;
+ Status status;
+ while (true) {
+ unique_ptr<BufferDescriptor> buffer;
+ status = range->GetNext(&buffer);
+ ASSERT_TRUE(status.ok() || status.IsCancelled()) << status.GetDetail();
+ if (status.IsCancelled() || buffer == nullptr) break;
+ EXPECT_EQ(0, memcmp(data + data_offset, buffer->buffer(), buffer->len()));
+ data_offset += buffer->len();
+ buffers.emplace_back(move(buffer));
+ }
+ if (status.ok()) ASSERT_EQ(DATA_BYTES, data_offset);
+ for (auto& buffer : buffers) range->ReturnBuffer(move(buffer));
+ }));
+ // Let the thread start running before starting the next.
+ SleepForMs(10);
}
-
- EXPECT_TRUE(hit_mem_limit_exceeded) << "Should have run out of memory";
+ // Let the threads run for a bit then cancel everything.
+ SleepForMs(500);
+ reader->Cancel();
+ // Wait until the threads have returned their buffers before unregistering.
+ threads.join_all();
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
@@ -696,7 +769,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
// only tests the fallback mechanism.
// TODO: we can fake the cached read path without HDFS
TEST_F(DiskIoMgrTest, CachedReads) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
@@ -711,51 +784,54 @@ TEST_F(DiskIoMgrTest, CachedReads) {
DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
ScanRange* complete_range =
- InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+ InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
// Issue some reads before the async ones are issued
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
vector<ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(
- InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+ InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
}
ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
- threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
- strlen(data), Status::OK(), 0, &num_ranges_processed));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
+ data, strlen(data), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
}
threads.join_all();
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
- ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int ITERATIONS = 1;
const char* data = "abcdefghijklmnopqrstuvwxyz";
const int num_contexts = 5;
@@ -777,17 +853,22 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
int64_t iters = 0;
vector<unique_ptr<RequestContext>> contexts(num_contexts);
+ unique_ptr<BufferPool::ClientHandle[]> clients(
+ new BufferPool::ClientHandle[num_contexts]);
Status status;
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init());
for (int file_index = 0; file_index < num_contexts; ++file_index) {
- contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[file_index]);
+ contexts[file_index] = io_mgr.RegisterContext();
}
- pool_.Clear();
int read_offset = 0;
int write_offset = 0;
while (read_offset < file_size) {
@@ -799,11 +880,11 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
for (int i = 0; i < num_scan_ranges; ++i) {
ranges.push_back(InitRange(
- file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
- threads.add_thread(
- new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(),
- reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
- 1, Status::OK(), num_scan_ranges, &num_ranges_processed));
+ &tmp_pool, file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr,
+ contexts[context_index].get(), &clients[context_index],
+ reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
+ 1, Status::OK(), num_scan_ranges, &num_ranges_processed));
++read_offset;
}
@@ -813,7 +894,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
this, num_write_ranges, _1);
- WriteRange* new_range = pool_.Add(new WriteRange(
+ WriteRange* new_range = tmp_pool.Add(new WriteRange(
file_name, write_offset, i % num_disks, callback));
new_range->SetData(
reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))),
@@ -832,6 +913,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
} // while (read_offset < file_size)
for (int file_index = 0; file_index < num_contexts; ++file_index) {
io_mgr.UnregisterContext(contexts[file_index].get());
+ buffer_pool()->DeregisterClient(&clients[file_index]);
}
} // for (int num_disks
} // for (int threads_per_disk
@@ -840,23 +922,19 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
// This test will test multiple concurrent reads each reading a different file.
TEST_F(DiskIoMgrTest, MultipleReader) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int NUM_READERS = 5;
const int DATA_LEN = 50;
const int ITERATIONS = 25;
const int NUM_THREADS_PER_READER = 3;
- vector<string> file_names;
- vector<int64_t> mtimes;
- vector<string> data;
- vector<unique_ptr<RequestContext>> readers;
- vector<char*> results;
-
- file_names.resize(NUM_READERS);
- readers.resize(NUM_READERS);
- mtimes.resize(NUM_READERS);
- data.resize(NUM_READERS);
- results.resize(NUM_READERS);
+ vector<string> file_names(NUM_READERS);
+ vector<int64_t> mtimes(NUM_READERS);
+ vector<string> data(NUM_READERS);
+ unique_ptr<BufferPool::ClientHandle[]> clients(
+ new BufferPool::ClientHandle[NUM_READERS]);
+ vector<unique_ptr<RequestContext>> readers(NUM_READERS);
+ vector<char*> results(NUM_READERS);
// Initialize data for each reader. The data will be
// 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
@@ -887,7 +965,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
- pool_.Clear(); // Destroy scan ranges from previous iterations.
+ // Pool for temporary objects from this iteration only.
+ ObjectPool tmp_pool;
LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
<< " num_disk=" << num_disks;
if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
@@ -897,12 +976,14 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
ASSERT_OK(io_mgr.Init());
for (int i = 0; i < NUM_READERS; ++i) {
- readers[i] = io_mgr.RegisterContext(&mem_tracker);
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[i]);
+ readers[i] = io_mgr.RegisterContext();
vector<ScanRange*> ranges;
for (int j = 0; j < DATA_LEN; ++j) {
int disk_id = j % num_disks;
- ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
+ ranges.push_back(InitRange(&tmp_pool, file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
}
ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges));
}
@@ -912,18 +993,20 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
for (int i = 0; i < NUM_READERS; ++i) {
for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(),
- data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed));
+ &clients[i], data[i].c_str(), data[i].size(), Status::OK(), 0,
+ &num_ranges_processed));
}
}
threads.join_all();
EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
for (int i = 0; i < NUM_READERS; ++i) {
io_mgr.UnregisterContext(readers[i].get());
+ buffer_pool()->DeregisterClient(&clients[i]);
}
}
}
}
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Stress test for multiple clients with cancellation
@@ -937,7 +1020,7 @@ TEST_F(DiskIoMgrTest, StressTest) {
// IMPALA-2366: handle partial read where range goes past end of file.
TEST_F(DiskIoMgrTest, PartialRead) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
@@ -956,19 +1039,22 @@ TEST_F(DiskIoMgrTest, PartialRead) {
for (int64_t max_buffer_size : max_buffer_sizes) {
DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, max_buffer_size);
-
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
unique_ptr<RequestContext> reader;
- reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ reader = io_mgr.RegisterContext();
+
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
// We should not read past the end of file.
- ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, read_len, 0, stat_val.st_mtime);
unique_ptr<BufferDescriptor> buffer;
bool needs_buffers;
ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
if (needs_buffers) {
- ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), range, 3 * max_buffer_size));
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(
+ reader.get(), &read_client, range, 3 * max_buffer_size));
}
int64_t bytes_read = 0;
@@ -994,15 +1080,13 @@ TEST_F(DiskIoMgrTest, PartialRead) {
} while (!eosr);
io_mgr.UnregisterContext(reader.get());
- EXPECT_EQ(reader_mem_tracker.consumption(), 0);
- EXPECT_EQ(mem_tracker.consumption(), 0);
- pool_.Clear();
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
}
// Test zero-length scan range.
TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
const int64_t MIN_BUFFER_SIZE = 2;
@@ -1016,12 +1100,9 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init());
- MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader;
- reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
- // We should not read past the end of file.
- ScanRange* range = InitRange(tmp_file, 0, 0, 0, stat_val.st_mtime);
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, 0, 0, stat_val.st_mtime);
bool needs_buffers;
Status status = io_mgr.StartScanRange(reader.get(), range, &needs_buffers);
ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
@@ -1035,7 +1116,6 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
// Test what happens if don't call AllocateBuffersForRange() after trying to start a
// range.
TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
@@ -1051,13 +1131,12 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
ASSERT_OK(io_mgr.Init());
MemTracker reader_mem_tracker;
- unique_ptr<RequestContext> reader;
- reader = io_mgr.RegisterContext(&reader_mem_tracker);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
// We should not read past the end of file.
vector<ScanRange*> ranges;
for (int i = 0; i < 4; ++i) {
- ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+ ranges.push_back(InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime));
}
bool needs_buffers;
// Test StartScanRange().
@@ -1080,7 +1159,7 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
// Test reading into a client-allocated buffer.
TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
@@ -1090,15 +1169,13 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
ASSERT_OK(io_mgr->Init());
- // Reader doesn't need to provide mem tracker if it's providing buffers.
- MemTracker* reader_mem_tracker = nullptr;
- unique_ptr<RequestContext> reader;
- reader = io_mgr->RegisterContext(reader_mem_tracker);
+ // Reader doesn't need to provide client if it's providing buffers.
+ unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
- ScanRange* range = AllocateRange();
+ ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), buffer_len));
bool needs_buffers;
@@ -1113,32 +1190,31 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0);
// DiskIoMgr should not have allocated memory.
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
range->ReturnBuffer(move(io_buffer));
}
io_mgr->UnregisterContext(reader.get());
- pool_.Clear();
- io_mgr.reset();
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test reading into a client-allocated buffer where the read fails.
TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* tmp_file = "/file/that/does/not/exist";
const int SCAN_LEN = 128;
scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
ASSERT_OK(io_mgr->Init());
- // Reader doesn't need to provide mem tracker if it's providing buffers.
- MemTracker* reader_mem_tracker = nullptr;
- unique_ptr<RequestContext> reader;
vector<uint8_t> client_buffer(SCAN_LEN);
for (int i = 0; i < 1000; ++i) {
- reader = io_mgr->RegisterContext(reader_mem_tracker);
- ScanRange* range = AllocateRange();
+ // Reader doesn't need to provide mem tracker if it's providing buffers.
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+ unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+ ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
bool needs_buffers;
@@ -1153,25 +1229,25 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
ASSERT_FALSE(range->GetNext(&io_buffer).ok());
// DiskIoMgr should not have allocated memory.
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
io_mgr->UnregisterContext(reader.get());
+ EXPECT_EQ(read_client.GetUsedReservation(), 0);
+ buffer_pool()->DeregisterClient(&read_client);
}
- pool_.Clear();
- io_mgr.reset();
- EXPECT_EQ(mem_tracker.consumption(), 0);
+ EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
}
// Test to verify configuration parameters for number of I/O threads per disk.
TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
+ FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
// Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
// Since we do not have control over which disk is used, we check for either type
// (rotational/solid state)
- MemTracker mem_tracker(LARGE_MEM_LIMIT);
const int num_io_threads_per_rotational_or_ssd = 2;
DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
num_io_threads_per_rotational_or_ssd, 1, 10);
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 6dda447..6c7b9e6 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#include "common/global-flags.h"
#include "runtime/io/disk-io-mgr.h"
+
+#include "common/global-flags.h"
+#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/handle-cache.inline.h"
@@ -52,6 +54,8 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
static const int THREADS_PER_ROTATIONAL_DISK = 1;
static const int THREADS_PER_SOLID_STATE_DISK = 8;
+const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE;
+
// The maximum number of the threads per rotational disk is also the max queue depth per
// rotational disk.
static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of "
@@ -309,9 +313,8 @@ Status DiskIoMgr::Init() {
return Status::OK();
}
-unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) {
- return unique_ptr<RequestContext>(
- new RequestContext(this, num_total_disks(), mem_tracker));
+unique_ptr<RequestContext> DiskIoMgr::RegisterContext() {
+ return unique_ptr<RequestContext>(new RequestContext(this, num_total_disks()));
}
void DiskIoMgr::UnregisterContext(RequestContext* reader) {
@@ -455,28 +458,21 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang
}
}
-Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, ScanRange* range,
- int64_t max_bytes) {
+Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader,
+ BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) {
DCHECK_GE(max_bytes, min_buffer_size_);
DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
<< static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers "
<< "when already reading into an external buffer";
-
+ BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
Status status;
vector<unique_ptr<BufferDescriptor>> buffers;
for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
- if (!reader->mem_tracker_->TryConsume(buffer_size)) {
- status = reader->mem_tracker_->MemLimitExceeded(nullptr,
- "Failed to allocate I/O buffer", buffer_size);
- goto error;
- }
- uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
- if (buffer == nullptr) {
- reader->mem_tracker_->Release(buffer_size);
- status = Status(Substitute("Failed to malloc $0-byte I/O buffer", buffer_size));
- goto error;
- }
- buffers.emplace_back(new BufferDescriptor(this, reader, range, buffer, buffer_size));
+ BufferPool::BufferHandle handle;
+ status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
+ if (!status.ok()) goto error;
+ buffers.emplace_back(new BufferDescriptor(
+ this, reader, range, bp_client, move(handle)));
}
range->AddUnusedBuffers(move(buffers), false);
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index d429d1d..d246e95 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -30,6 +30,7 @@
#include "common/hdfs.h"
#include "common/object-pool.h"
#include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/handle-cache.h"
#include "runtime/io/request-ranges.h"
#include "runtime/thread-resource-mgr.h"
@@ -42,8 +43,6 @@
namespace impala {
-class MemTracker;
-
namespace io {
/// Manager object that schedules IO for all queries on all disks and remote filesystems
/// (such as S3). Each query maps to one or more RequestContext objects, each of which
@@ -163,6 +162,7 @@ namespace io {
/// buffer and one buffer is in the disk queue. The additional buffer can absorb
/// bursts where the producer runs faster than the consumer or the consumer runs
/// faster than the producer without blocking either the producer or consumer.
+/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE.
///
/// Caching support:
/// Scan ranges contain metadata on whether or not it is cached on the DN. In that
@@ -243,11 +243,7 @@ class DiskIoMgr : public CacheLineAligned {
/// Allocates tracking structure for a request context.
/// Register a new request context and return it to the caller. The caller must call
/// UnregisterContext() for each context.
- /// reader_mem_tracker: Is non-null only for readers. IO buffers
- /// used for this reader will be tracked by this. If the limit is exceeded
- /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
- /// GetNext().
- std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker);
+ std::unique_ptr<RequestContext> RegisterContext();
/// Unregisters context from the disk IoMgr by first cancelling it then blocking until
/// all references to the context are removed from I/O manager internal data structures.
@@ -302,16 +298,15 @@ class DiskIoMgr : public CacheLineAligned {
/// *needs_buffers=true.
///
/// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >=
- /// min_read_buffer_size() so that at least one buffer can be allocated. Returns ok
- /// if the buffers were successfully allocated and the range was scheduled. Fails with
- /// MEM_LIMIT_EXCEEDED if the buffers could not be allocated. On failure, any allocated
- /// buffers are freed and the state of 'range' is unmodified so that allocation can be
- /// retried. Setting 'max_bytes' to 3 * max_buffer_size() will typically maximize I/O
- /// throughput. See Buffer management" section of the class comment for explanation.
- /// TODO: error handling contract will change with reservations. The caller needs to
- /// to guarantee that there is sufficient reservation.
- Status AllocateBuffersForRange(RequestContext* reader, ScanRange* range,
- int64_t max_bytes);
+ /// min_read_buffer_size() so that at least one buffer can be allocated. The caller
+ /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok
+ /// if the buffers were successfully allocated and the range was scheduled.
+ ///
+ /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size()
+ /// will typically maximize I/O throughput. See the "Buffer Management" section of
+ /// the class comment for explanation.
+ Status AllocateBuffersForRange(RequestContext* reader,
+ BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes);
/// Determine which disk queue this file should be assigned to. Returns an index into
/// disk_queues_. The disk_id is the volume ID for the local disk that holds the
@@ -379,6 +374,10 @@ class DiskIoMgr : public CacheLineAligned {
REMOTE_NUM_DISKS
};
+ /// The ideal number of max-sized buffers per scan range to maximise throughput.
+ /// See "Buffer Management" in the class comment for explanation.
+ static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;
+
private:
friend class BufferDescriptor;
friend class RequestContext;
@@ -401,7 +400,7 @@ class DiskIoMgr : public CacheLineAligned {
/// Maximum read size. This is also the maximum size of each allocated buffer.
const int64_t max_buffer_size_;
- /// The minimum size of each read buffer.
+ /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len().
const int64_t min_buffer_size_;
/// Thread group containing all the worker threads.
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index b124702..dec6aa6 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -17,6 +17,8 @@
#include "runtime/io/disk-io-mgr-internal.h"
+#include "runtime/exec-env.h"
+
#include "common/names.h"
using namespace impala;
@@ -36,12 +38,28 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
DCHECK_GE(buffer_len, 0);
}
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+ ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+ BufferPool::BufferHandle handle) :
+ io_mgr_(io_mgr),
+ reader_(reader),
+ scan_range_(scan_range),
+ buffer_(handle.data()),
+ buffer_len_(handle.len()),
+ bp_client_(bp_client),
+ handle_(move(handle)) {
+ DCHECK(io_mgr != nullptr);
+ DCHECK(scan_range != nullptr);
+ DCHECK(bp_client_->is_registered());
+ DCHECK(handle_.is_open());
+}
+
void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
DCHECK(buffer->buffer_ != nullptr);
if (!buffer->is_cached() && !buffer->is_client_buffer()) {
- // Only buffers that were not allocated by DiskIoMgr need to have memory freed.
- free(buffer->buffer_);
- mem_tracker_->Release(buffer->buffer_len_);
+ // Only buffers that were allocated by DiskIoMgr need to be freed.
+ ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+ buffer->bp_client_, &buffer->handle_);
}
buffer->buffer_ = nullptr;
}
@@ -200,9 +218,8 @@ void RequestContext::RemoveActiveScanRangeLocked(
active_scan_ranges_.erase(range);
}
-RequestContext::RequestContext(
- DiskIoMgr* parent, int num_disks, MemTracker* tracker)
- : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
+RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+ : parent_(parent), disk_states_(num_disks) {}
// Dumps out request context information. Lock should be taken by caller
string RequestContext::DebugString() const {
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 3aea2bc..24fd0fc 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -158,12 +158,12 @@ class RequestContext {
Inactive,
};
- RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
+ RequestContext(DiskIoMgr* parent, int num_disks);
- /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer
- /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a
- /// client or HDFS cache buffer), just prepares the descriptor to be destroyed.
- /// After this is called, buffer->buffer() is NULL. Does not acquire 'lock_'.
+ /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(),
+ /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the
+ /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL.
+ /// Does not acquire 'lock_'.
void FreeBuffer(BufferDescriptor* buffer);
/// Decrements the number of active disks for this reader. If the disk count
@@ -239,10 +239,6 @@ class RequestContext {
/// Parent object
DiskIoMgr* const parent_;
- /// Memory used for this reader. This is unowned by this object.
- /// TODO: replace with bp client
- MemTracker* const mem_tracker_;
-
/// Total bytes read for this reader
RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 041cb9d..0b234ac 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -32,8 +32,6 @@
#include "util/mem-range.h"
namespace impala {
-class MemTracker;
-
namespace io {
class DiskIoMgr;
class RequestContext;
@@ -63,11 +61,15 @@ class BufferDescriptor {
friend class ScanRange;
friend class RequestContext;
- /// Create a buffer descriptor for a new reader, range and data buffer. The buffer
- /// memory should already be accounted against 'mem_tracker'.
+ /// Create a buffer descriptor for a new reader, range and data buffer.
BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len);
+ /// Create a buffer descriptor allocated from the buffer pool.
+ BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+ ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+ BufferPool::BufferHandle handle);
+
/// Return true if this is a cached buffer owned by HDFS.
bool is_cached() const;
@@ -97,6 +99,11 @@ class BufferDescriptor {
bool eosr_ = false;
int64_t scan_range_offset_ = 0;
+
+ // Handle to an allocated buffer and the client used to allocate it buffer. Only used
+ // for non-external buffers.
+ BufferPool::ClientHandle* bp_client_ = nullptr;
+ BufferPool::BufferHandle handle_;
};
/// The request type, read or write associated with a request range.
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 0663a2b..9c2110c 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -340,9 +340,6 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
DCHECK(exclusive_hdfs_fh_ == nullptr);
DCHECK(local_file_ == nullptr);
- // Reader must provide MemTracker or a buffer.
- DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
- || reader->mem_tracker_ != nullptr);
io_mgr_ = io_mgr;
reader_ = reader;
local_file_ = nullptr;
@@ -650,8 +647,7 @@ Status ScanRange::ReadFromCache(
}
// Create a single buffer desc for the entire scan range and enqueue that.
- // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
- // not the Impala backend.
+ // The memory is owned by the HDFS java client, not the Impala backend.
unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0));
desc->len_ = bytes_read;
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 3a69c33..e0c58d4 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -243,7 +243,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
next_allocation_index_(0),
free_ranges_(64) {
DCHECK(tmp_file_mgr != nullptr);
- io_ctx_ = io_mgr_->RegisterContext(nullptr);
+ io_ctx_ = io_mgr_->RegisterContext();
}
TmpFileMgr::FileGroup::~FileGroup() {
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c5df1cd..1ab05a0 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -239,6 +239,9 @@ struct THdfsScanNode {
// The byte offset of the slot for Parquet metadata if Parquet count star optimization
// is enabled.
10: optional i32 parquet_count_star_slot_offset
+
+ // The ideal memory reservation in bytes to process an input split.
+ 11: optional i64 ideal_scan_range_reservation
}
struct TDataSourceScanNode {
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 4f0a0e1..aae3863 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -163,6 +163,25 @@ public class SlotDescriptor {
}
/**
+ * Checks if this descriptor describes an array "pos" pseudo-column.
+ *
+ * Note: checking whether the column is null distinguishes between top-level columns
+ * and nested types. This check more specifically looks just for a reference to the
+ * "pos" field of an array type.
+ */
+ public boolean isArrayPosRef() {
+ if (parent_ == null) return false;
+ Type parentType = parent_.getType();
+ if (parentType instanceof CollectionStructType) {
+ if (((CollectionStructType)parentType).isArrayStruct() &&
+ label_.equals(Path.ARRAY_POS_FIELD_NAME)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Assembles the absolute materialized path to this slot starting from the schema
* root. The materialized path points to the first non-struct schema element along the
* path starting from the parent's tuple path to this slot's path.
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 23f2d88..0a945bd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -153,26 +153,6 @@ public class SlotRef extends Expr {
return "<slot " + Integer.toString(desc_.getId().asInt()) + ">";
}
- /**
- * Checks if this slotRef refers to an array "pos" pseudo-column.
- *
- * Note: checking whether the column is null distinguishes between top-level columns
- * and nested types. This check more specifically looks just for a reference to the
- * "pos" field of an array type.
- */
- public boolean isArrayPosRef() {
- TupleDescriptor parent = getDesc().getParent();
- if (parent == null) return false;
- Type parentType = parent.getType();
- if (parentType instanceof CollectionStructType) {
- if (((CollectionStructType)parentType).isArrayStruct() &&
- getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) {
- return true;
- }
- }
- return false;
- }
-
@Override
protected void toThrift(TExprNode msg) {
msg.node_type = TExprNodeType.SLOT_REF;