You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:54 UTC
[11/16] incubator-impala git commit: IMPALA-4835 (prep only): create
io subfolder and namespace
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
new file mode 100644
index 0000000..b03ec31
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -0,0 +1,1129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sched.h>
+#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
+#include <sys/stat.h>
+
+#include "codegen/llvm-codegen.h"
+#include "common/init.h"
+#include "runtime/io/request-context.h"
+#include "runtime/io/disk-io-mgr-stress.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/thread-resource-mgr.h"
+#include "testutil/gtest-util.h"
+#include "util/condition-variable.h"
+#include "util/cpu-info.h"
+#include "util/disk-info.h"
+#include "util/thread.h"
+
+#include "common/names.h"
+
+DECLARE_int32(num_remote_hdfs_io_threads);
+DECLARE_int32(num_s3_io_threads);
+DECLARE_int32(num_adls_io_threads);
+
+const int MIN_BUFFER_SIZE = 512;
+const int MAX_BUFFER_SIZE = 1024;
+const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
+
+namespace impala {
+namespace io {
+
+class DiskIoMgrTest : public testing::Test {
+ public:
+
+ virtual void SetUp() {}
+
+ virtual void TearDown() {
+ pool_.Clear();
+ }
+ void WriteValidateCallback(int num_writes, WriteRange** written_range,
+ DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data,
+ Status expected_status, const Status& status) {
+ if (expected_status.code() == TErrorCode::CANCELLED) {
+ EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
+ } else {
+ EXPECT_EQ(status.code(), expected_status.code());
+ }
+ if (status.ok()) {
+ ScanRange* scan_range = pool_.Add(new ScanRange());
+ scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
+ (*written_range)->offset(), 0, false, BufferOpts::Uncached());
+ ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
+ sizeof(int32_t));
+ }
+
+ {
+ lock_guard<mutex> l(written_mutex_);
+ ++num_ranges_written_;
+ if (num_ranges_written_ == num_writes) writes_done_.NotifyOne();
+ }
+ }
+
+ void WriteCompleteCallback(int num_writes, const Status& status) {
+ EXPECT_OK(status);
+ {
+ lock_guard<mutex> l(written_mutex_);
+ ++num_ranges_written_;
+ if (num_ranges_written_ == num_writes) writes_done_.NotifyAll();
+ }
+ }
+
+ protected:
+ void CreateTempFile(const char* filename, const char* data) {
+ FILE* file = fopen(filename, "w");
+ EXPECT_TRUE(file != nullptr);
+ fwrite(data, 1, strlen(data), file);
+ fclose(file);
+ }
+
+ int CreateTempFile(const char* filename, int file_size) {
+ FILE* file = fopen(filename, "w");
+ EXPECT_TRUE(file != nullptr);
+ int success = fclose(file);
+ if (success != 0) {
+ LOG(ERROR) << "Error closing file " << filename;
+ return success;
+ }
+ return truncate(filename, file_size);
+ }
+
+ // Validates that buffer[i] is \0 or expected[i]
+ static void ValidateEmptyOrCorrect(const char* expected, const char* buffer, int len) {
+ for (int i = 0; i < len; ++i) {
+ if (buffer[i] != '\0') {
+ EXPECT_EQ(expected[i], buffer[i]) << (int)expected[i] << " != " << (int)buffer[i];
+ }
+ }
+ }
+
+ static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
+ ScanRange* range, const char* expected, int expected_len = -1) {
+ unique_ptr<BufferDescriptor> buffer;
+ ASSERT_OK(io_mgr->Read(reader, range, &buffer));
+ ASSERT_TRUE(buffer != nullptr);
+ EXPECT_EQ(buffer->len(), range->len());
+ if (expected_len < 0) expected_len = strlen(expected);
+ int cmp = memcmp(buffer->buffer(), expected, expected_len);
+ EXPECT_TRUE(cmp == 0);
+ io_mgr->ReturnBuffer(move(buffer));
+ }
+
+ static void ValidateScanRange(DiskIoMgr* io_mgr, ScanRange* range,
+ const char* expected, int expected_len, const Status& expected_status) {
+ char result[expected_len + 1];
+ memset(result, 0, expected_len + 1);
+
+ while (true) {
+ unique_ptr<BufferDescriptor> buffer;
+ Status status = range->GetNext(&buffer);
+ ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
+ if (buffer == nullptr || !status.ok()) {
+ if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer));
+ break;
+ }
+ ASSERT_LE(buffer->len(), expected_len);
+ memcpy(result + range->offset() + buffer->scan_range_offset(),
+ buffer->buffer(), buffer->len());
+ io_mgr->ReturnBuffer(move(buffer));
+ }
+ ValidateEmptyOrCorrect(expected, result, expected_len);
+ }
+
+ // Continues pulling scan ranges from the io mgr until they are all done.
+ // Updates num_ranges_processed with the number of ranges seen by this thread.
+ static void ScanRangeThread(DiskIoMgr* io_mgr, RequestContext* reader,
+ const char* expected_result, int expected_len, const Status& expected_status,
+ int max_ranges, AtomicInt32* num_ranges_processed) {
+ int num_ranges = 0;
+ while (max_ranges == 0 || num_ranges < max_ranges) {
+ ScanRange* range;
+ Status status = io_mgr->GetNextRange(reader, &range);
+ ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
+ if (range == nullptr) break;
+ ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status);
+ num_ranges_processed->Add(1);
+ ++num_ranges;
+ }
+ }
+
+ ScanRange* AllocateRange() {
+ return pool_.Add(new ScanRange);
+ }
+
+ ScanRange* InitRange(const char* file_path, int offset, int len,
+ int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
+ ScanRange* range = AllocateRange();
+ range->Reset(nullptr, file_path, len, offset, disk_id, true,
+ BufferOpts(is_cached, mtime), meta_data);
+ EXPECT_EQ(mtime, range->mtime());
+ return range;
+ }
+
+ ObjectPool pool_;
+
+ mutex written_mutex_;
+ ConditionVariable writes_done_;
+ int num_ranges_written_;
+};
+
+// Test a single writer with multiple disks and threads per disk. Each WriteRange
+// writes random 4-byte integers, and upon completion, the written data is validated
+// by reading the data back via a separate IoMgr instance. All writes are expected to
+// complete successfully.
+TEST_F(DiskIoMgrTest, SingleWriter) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ num_ranges_written_ = 0;
+ string tmp_file = "/tmp/disk_io_mgr_test.txt";
+ int num_ranges = 100;
+ int64_t file_size = 1024 * 1024;
+ int64_t cur_offset = 0;
+ int success = CreateTempFile(tmp_file.c_str(), file_size);
+ if (success != 0) {
+ LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " <<
+ file_size;
+ EXPECT_TRUE(false);
+ }
+
+ scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
+ MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+ ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
+ unique_ptr<RequestContext> reader =
+ read_io_mgr->RegisterContext(&reader_mem_tracker);
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ for (int i = 0; i < num_ranges; ++i) {
+ int32_t* data = pool_.Add(new int32_t);
+ *data = rand();
+ WriteRange** new_range = pool_.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback =
+ bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
+ new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1);
+ *new_range = pool_.Add(new WriteRange(
+ tmp_file, cur_offset, num_ranges % num_disks, callback));
+ (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+ EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
+ cur_offset += sizeof(int32_t);
+ }
+
+ {
+ unique_lock<mutex> lock(written_mutex_);
+ while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock);
+ }
+ num_ranges_written_ = 0;
+ io_mgr.UnregisterContext(writer.get());
+ }
+ }
+
+ read_io_mgr->UnregisterContext(reader.get());
+ read_io_mgr.reset();
+}
+
+// Perform invalid writes (e.g. file in non-existent directory, negative offset) and
+// validate that an error status is returned via the write callback.
+TEST_F(DiskIoMgrTest, InvalidWrite) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ num_ranges_written_ = 0;
+ string tmp_file = "/non-existent/file.txt";
+ DiskIoMgr io_mgr(1, 1, 1, 1, 10);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
+ int32_t* data = pool_.Add(new int32_t);
+ *data = rand();
+
+ // Write to file in non-existent directory.
+ WriteRange** new_range = pool_.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback =
+ bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
+ (DiskIoMgr*)nullptr, (RequestContext*)nullptr, data,
+ Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+ *new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback));
+
+ (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+ EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
+
+ // Write to a bad location in a file that exists.
+ tmp_file = "/tmp/disk_io_mgr_test.txt";
+ int success = CreateTempFile(tmp_file.c_str(), 100);
+ if (success != 0) {
+ LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size 100";
+ EXPECT_TRUE(false);
+ }
+
+ new_range = pool_.Add(new WriteRange*);
+ callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
+ new_range, (DiskIoMgr*)nullptr, (RequestContext*)nullptr,
+ data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+
+ *new_range = pool_.Add(new WriteRange(tmp_file, -1, 0, callback));
+ (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+ EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
+
+ {
+ unique_lock<mutex> lock(written_mutex_);
+ while (num_ranges_written_ < 2) writes_done_.Wait(lock);
+ }
+ num_ranges_written_ = 0;
+ io_mgr.UnregisterContext(writer.get());
+}
+
+// Issue a number of writes, cancel the writer context and issue more writes.
+// AddWriteRange() is expected to succeed before the cancel and fail after it.
+// The writes themselves may finish with status cancelled or ok.
+TEST_F(DiskIoMgrTest, SingleWriterCancel) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ num_ranges_written_ = 0;
+ string tmp_file = "/tmp/disk_io_mgr_test.txt";
+ int num_ranges = 100;
+ int num_ranges_before_cancel = 25;
+ int64_t file_size = 1024 * 1024;
+ int64_t cur_offset = 0;
+ int success = CreateTempFile(tmp_file.c_str(), file_size);
+ if (success != 0) {
+ LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " <<
+ file_size;
+ EXPECT_TRUE(false);
+ }
+
+ scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
+ MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+ ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
+ unique_ptr<RequestContext> reader =
+ read_io_mgr->RegisterContext(&reader_mem_tracker);
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+ Status validate_status = Status::OK();
+ for (int i = 0; i < num_ranges; ++i) {
+ if (i == num_ranges_before_cancel) {
+ io_mgr.CancelContext(writer.get());
+ validate_status = Status::CANCELLED;
+ }
+ int32_t* data = pool_.Add(new int32_t);
+ *data = rand();
+ WriteRange** new_range = pool_.Add(new WriteRange*);
+ WriteRange::WriteDoneCallback callback = bind(
+ mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel,
+ new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1);
+ *new_range = pool_.Add(new WriteRange(
+ tmp_file, cur_offset, num_ranges % num_disks, callback));
+ (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+ cur_offset += sizeof(int32_t);
+ Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range);
+ EXPECT_TRUE(add_status.code() == validate_status.code());
+ }
+
+ {
+ unique_lock<mutex> lock(written_mutex_);
+ while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock);
+ }
+ num_ranges_written_ = 0;
+ io_mgr.UnregisterContext(writer.get());
+ }
+ }
+
+ read_io_mgr->UnregisterContext(reader.get());
+ read_io_mgr.reset();
+}
+
+// Basic test with a single reader, testing multiple threads, disks and a different
+// number of buffers.
+TEST_F(DiskIoMgrTest, SingleReader) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ int64_t iters = 0;
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
+ ObjectPool pool;
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks
+ << " num_read_threads=" << num_read_threads;
+
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader =
+ io_mgr.RegisterContext(&reader_mem_tracker);
+
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
+
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < num_read_threads; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len,
+ Status::OK(), 0, &num_ranges_processed));
+ }
+ threads.join_all();
+
+ EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+ }
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// This test issues adding additional scan ranges while there are some still in flight.
+TEST_F(DiskIoMgrTest, AddScanRangeTest) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ int64_t iters = 0;
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
+
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader =
+ io_mgr.RegisterContext(&reader_mem_tracker);
+
+ vector<ScanRange*> ranges_first_half;
+ vector<ScanRange*> ranges_second_half;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ if (i > len / 2) {
+ ranges_second_half.push_back(
+ InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ } else {
+ ranges_first_half.push_back(
+ InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+ }
+ }
+ AtomicInt32 num_ranges_processed;
+
+ // Issue first half the scan ranges.
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half));
+
+ // Read a couple of them
+ ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2,
+ &num_ranges_processed);
+
+ // Issue second half
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half));
+
+ // Start up some threads and then cancel
+ thread_group threads;
+ for (int i = 0; i < 3; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ }
+
+ threads.join_all();
+ EXPECT_EQ(num_ranges_processed.Load(), len);
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Test to make sure that sync reads and async reads work together
+// Note: this test is constructed so the number of buffers is greater than the
+// number of scan ranges.
+TEST_F(DiskIoMgrTest, SyncReadTest) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ int64_t iters = 0;
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
+
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+ MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader =
+ io_mgr.RegisterContext(&reader_mem_tracker);
+
+ ScanRange* complete_range =
+ InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
+
+ // Issue some reads before the async ones are issued
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
+
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < 5; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
+ strlen(data), Status::OK(), 0, &num_ranges_processed));
+ }
+
+ // Issue some more sync ranges
+ for (int i = 0; i < 5; ++i) {
+ sched_yield();
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ }
+
+ threads.join_all();
+
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+
+ EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Tests a single reader cancelling half way through scan ranges.
+TEST_F(DiskIoMgrTest, SingleReaderCancel) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ int64_t iters = 0;
+ for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+ << " num_disk=" << num_disks;
+
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader =
+ io_mgr.RegisterContext(&reader_mem_tracker);
+
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
+
+ AtomicInt32 num_ranges_processed;
+ int num_succesful_ranges = ranges.size() / 2;
+ // Read half the ranges
+ for (int i = 0; i < num_succesful_ranges; ++i) {
+ ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1,
+ &num_ranges_processed);
+ }
+ EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
+
+ // Start up some threads and then cancel
+ thread_group threads;
+ for (int i = 0; i < 3; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
+ strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+ }
+
+ io_mgr.CancelContext(reader.get());
+ sched_yield();
+
+ threads.join_all();
+ EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled());
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Test when the reader goes over the mem limit
+TEST_F(DiskIoMgrTest, MemLimits) {
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ const int mem_limit_num_buffers = 2;
+ // Allocate enough ranges so that the total buffers exceeds the mem limit.
+ const int num_ranges = 25;
+ {
+ MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
+ DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+ ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+ MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < num_ranges; ++i) {
+ ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
+
+ // Don't return buffers to force memory pressure
+ vector<unique_ptr<BufferDescriptor>> buffers;
+
+ AtomicInt32 num_ranges_processed;
+ ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
+ 1, &num_ranges_processed);
+
+ char result[strlen(data) + 1];
+ // Keep reading new ranges without returning buffers. This forces us
+ // to go over the limit eventually.
+ while (true) {
+ memset(result, 0, strlen(data) + 1);
+ ScanRange* range = nullptr;
+ Status status = io_mgr.GetNextRange(reader.get(), &range);
+ ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+ if (range == nullptr) break;
+
+ while (true) {
+ unique_ptr<BufferDescriptor> buffer;
+ Status status = range->GetNext(&buffer);
+ ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+ if (buffer == nullptr) break;
+ memcpy(result + range->offset() + buffer->scan_range_offset(),
+ buffer->buffer(), buffer->len());
+ buffers.push_back(move(buffer));
+ }
+ ValidateEmptyOrCorrect(data, result, strlen(data));
+ }
+
+ for (int i = 0; i < buffers.size(); ++i) {
+ io_mgr.ReturnBuffer(move(buffers[i]));
+ }
+
+ EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded());
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+}
+
+// Test when some scan ranges are marked as being cached.
+// Since these files are not in HDFS, the cached path always fails so this
+// only tests the fallback mechanism.
+// TODO: we can fake the cached read path without HDFS
+TEST_F(DiskIoMgrTest, CachedReads) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "abcdefghijklm";
+ int len = strlen(data);
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ const int num_disks = 2;
+ {
+ DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+
+ ScanRange* complete_range =
+ InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+
+ // Issue some reads before the async ones are issued
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+
+ vector<ScanRange*> ranges;
+ for (int i = 0; i < len; ++i) {
+ int disk_id = i % num_disks;
+ ranges.push_back(
+ InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
+
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < 5; ++i) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
+ strlen(data), Status::OK(), 0, &num_ranges_processed));
+ }
+
+ // Issue some more sync ranges
+ for (int i = 0; i < 5; ++i) {
+ sched_yield();
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ }
+
+ threads.join_all();
+
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+ ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+
+ EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+ io_mgr.UnregisterContext(reader.get());
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const int ITERATIONS = 1;
+ const char* data = "abcdefghijklmnopqrstuvwxyz";
+ const int num_contexts = 5;
+ const int file_size = 4 * 1024;
+ const int num_writes_queued = 5;
+ const int num_reads_queued = 5;
+
+ string file_name = "/tmp/disk_io_mgr_test.txt";
+ int success = CreateTempFile(file_name.c_str(), file_size);
+ if (success != 0) {
+ LOG(ERROR) << "Error creating temp file " << file_name.c_str() << " of size " <<
+ file_size;
+ ASSERT_TRUE(false);
+ }
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(file_name.c_str(), &stat_val);
+
+ int64_t iters = 0;
+ vector<unique_ptr<RequestContext>> contexts(num_contexts);
+ Status status;
+ for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
+ for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+ MAX_BUFFER_SIZE);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ for (int file_index = 0; file_index < num_contexts; ++file_index) {
+ contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
+ }
+ pool_.Clear();
+ int read_offset = 0;
+ int write_offset = 0;
+ while (read_offset < file_size) {
+ for (int context_index = 0; context_index < num_contexts; ++context_index) {
+ if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ vector<ScanRange*> ranges;
+ int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
+ for (int i = 0; i < num_scan_ranges; ++i) {
+ ranges.push_back(InitRange(
+ file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
+ threads.add_thread(
+ new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(),
+ reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
+ 1, Status::OK(), num_scan_ranges, &num_ranges_processed));
+ ++read_offset;
+ }
+
+ num_ranges_written_ = 0;
+ int num_write_ranges = min<int>(num_writes_queued, file_size - write_offset);
+ for (int i = 0; i < num_write_ranges; ++i) {
+ WriteRange::WriteDoneCallback callback =
+ bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
+ this, num_write_ranges, _1);
+ WriteRange* new_range = pool_.Add(new WriteRange(
+ file_name, write_offset, i % num_disks, callback));
+ new_range->SetData(
+ reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))),
+ 1);
+ status = io_mgr.AddWriteRange(contexts[context_index].get(), new_range);
+ ++write_offset;
+ }
+
+ {
+ unique_lock<mutex> lock(written_mutex_);
+ while (num_ranges_written_ < num_write_ranges) writes_done_.Wait(lock);
+ }
+
+ threads.join_all();
+ } // for (int context_index
+ } // while (read_offset < file_size)
+
+ for (int file_index = 0; file_index < num_contexts; ++file_index) {
+ io_mgr.UnregisterContext(contexts[file_index].get());
+ }
+ } // for (int num_disks
+ } // for (int threads_per_disk
+ } // for (int iteration
+}
+
+// This test will test multiple concurrent reads each reading a different file.
+TEST_F(DiskIoMgrTest, MultipleReader) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const int NUM_READERS = 5;
+ const int DATA_LEN = 50;
+ const int ITERATIONS = 25;
+ const int NUM_THREADS_PER_READER = 3;
+
+ vector<string> file_names;
+ vector<int64_t> mtimes;
+ vector<string> data;
+ vector<unique_ptr<RequestContext>> readers;
+ vector<char*> results;
+
+ file_names.resize(NUM_READERS);
+ readers.resize(NUM_READERS);
+ mtimes.resize(NUM_READERS);
+ data.resize(NUM_READERS);
+ results.resize(NUM_READERS);
+
+ // Initialize data for each reader. The data will be
+ // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
+ for (int i = 0; i < NUM_READERS; ++i) {
+ char buf[DATA_LEN];
+ for (int j = 0; j < DATA_LEN; ++j) {
+ int c = (j + i) % 26;
+ buf[j] = 'a' + c;
+ }
+ data[i] = string(buf, DATA_LEN);
+
+ stringstream ss;
+ ss << "/tmp/disk_io_mgr_test" << i << ".txt";
+ file_names[i] = ss.str();
+ CreateTempFile(ss.str().c_str(), data[i].c_str());
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(file_names[i].c_str(), &stat_val);
+ mtimes[i] = stat_val.st_mtime;
+
+ results[i] = new char[DATA_LEN + 1];
+ memset(results[i], 0, DATA_LEN + 1);
+ }
+
+ // This exercises concurrency, run the test multiple times
+ int64_t iters = 0;
+ for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
+ for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
+ for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+ pool_.Clear(); // Destroy scan ranges from previous iterations.
+ LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
+ << " num_disk=" << num_disks;
+ if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
+
+ DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+ MAX_BUFFER_SIZE);
+ EXPECT_OK(io_mgr.Init(&mem_tracker));
+
+ for (int i = 0; i < NUM_READERS; ++i) {
+ readers[i] = io_mgr.RegisterContext(&mem_tracker);
+
+ vector<ScanRange*> ranges;
+ for (int j = 0; j < DATA_LEN; ++j) {
+ int disk_id = j % num_disks;
+ ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
+ }
+ ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges));
+ }
+
+ AtomicInt32 num_ranges_processed;
+ thread_group threads;
+ for (int i = 0; i < NUM_READERS; ++i) {
+ for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
+ threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(),
+ data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed));
+ }
+ }
+ threads.join_all();
+ EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
+ for (int i = 0; i < NUM_READERS; ++i) {
+ io_mgr.UnregisterContext(readers[i].get());
+ }
+ }
+ }
+ }
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Stress test for multiple clients with cancellation
+// TODO: the stress app should be expanded to include sync reads and adding scan
+// ranges in the middle.
+TEST_F(DiskIoMgrTest, StressTest) {
+ // Run the test with 5 disks, 5 threads per disk, 10 clients and with cancellation
+ DiskIoMgrStress test(5, 5, 10, true);
+ test.Run(2); // In seconds
+}
+
+TEST_F(DiskIoMgrTest, Buffers) {
+ // Test default min/max buffer size
+ int min_buffer_size = 1024;
+ int max_buffer_size = 8 * 1024 * 1024; // 8 MB
+ MemTracker root_mem_tracker(max_buffer_size * 2);
+
+ DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
+ ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+ ASSERT_EQ(root_mem_tracker.consumption(), 0);
+
+ MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
+ unique_ptr<RequestContext> reader;
+ reader = io_mgr.RegisterContext(&reader_mem_tracker);
+
+ ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
+
+ // buffer length should be rounded up to min buffer size
+ int64_t buffer_len = 1;
+ unique_ptr<BufferDescriptor> buffer_desc;
+ buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+ EXPECT_TRUE(buffer_desc->buffer() != nullptr);
+ EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+ EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+ io_mgr.FreeBufferMemory(buffer_desc.get());
+ io_mgr.ReturnBuffer(move(buffer_desc));
+ EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
+
+ // reuse buffer
+ buffer_len = min_buffer_size;
+ buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+ EXPECT_TRUE(buffer_desc->buffer() != nullptr);
+ EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+ EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+ io_mgr.FreeBufferMemory(buffer_desc.get());
+ io_mgr.ReturnBuffer(move(buffer_desc));
+ EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
+
+ // bump up to next buffer size
+ buffer_len = min_buffer_size + 1;
+ buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+ EXPECT_TRUE(buffer_desc->buffer() != nullptr);
+ EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
+ EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+ EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
+
+ // gc unused buffer
+ io_mgr.GcIoBuffers();
+ EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+ EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
+
+ io_mgr.FreeBufferMemory(buffer_desc.get());
+ io_mgr.ReturnBuffer(move(buffer_desc));
+
+ // max buffer size
+ buffer_len = max_buffer_size;
+ buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
+ EXPECT_TRUE(buffer_desc->buffer() != nullptr);
+ EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
+ EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+ io_mgr.FreeBufferMemory(buffer_desc.get());
+ io_mgr.ReturnBuffer(move(buffer_desc));
+ EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, root_mem_tracker.consumption());
+
+ // gc buffers
+ io_mgr.GcIoBuffers();
+ EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
+ EXPECT_EQ(root_mem_tracker.consumption(), 0);
+ io_mgr.UnregisterContext(reader.get());
+}
+
+// IMPALA-2366: handle partial read where range goes past end of file.
+TEST_F(DiskIoMgrTest, PartialRead) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ int len = strlen(data);
+ int read_len = len + 1000; // Read past end of file.
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+
+ ASSERT_OK(io_mgr->Init(&mem_tracker));
+ MemTracker reader_mem_tracker;
+ unique_ptr<RequestContext> reader;
+ reader = io_mgr->RegisterContext(&reader_mem_tracker);
+
+ // We should not read past the end of file.
+ ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
+ unique_ptr<BufferDescriptor> buffer;
+ ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer));
+ ASSERT_TRUE(buffer->eosr());
+ ASSERT_EQ(len, buffer->len());
+ ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
+ io_mgr->ReturnBuffer(move(buffer));
+
+ io_mgr->UnregisterContext(reader.get());
+ pool_.Clear();
+ io_mgr.reset();
+ EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Test reading into a client-allocated buffer.
+TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ int len = strlen(data);
+ int read_len = 4; // Make buffer size smaller than client-provided buffer.
+ CreateTempFile(tmp_file, data);
+
+ scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+
+ ASSERT_OK(io_mgr->Init(&mem_tracker));
+ // Reader doesn't need to provide mem tracker if it's providing buffers.
+ MemTracker* reader_mem_tracker = nullptr;
+ unique_ptr<RequestContext> reader;
+ reader = io_mgr->RegisterContext(reader_mem_tracker);
+
+ for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
+ vector<uint8_t> client_buffer(buffer_len);
+ int scan_len = min(len, buffer_len);
+ ScanRange* range = AllocateRange();
+ range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
+ BufferOpts::ReadInto(client_buffer.data(), buffer_len));
+ ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+
+ unique_ptr<BufferDescriptor> io_buffer;
+ ASSERT_OK(range->GetNext(&io_buffer));
+ ASSERT_TRUE(io_buffer->eosr());
+ ASSERT_EQ(scan_len, io_buffer->len());
+ ASSERT_EQ(client_buffer.data(), io_buffer->buffer());
+ ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0);
+
+ // DiskIoMgr should not have allocated memory.
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+ io_mgr->ReturnBuffer(move(io_buffer));
+ }
+
+ io_mgr->UnregisterContext(reader.get());
+ pool_.Clear();
+ io_mgr.reset();
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Test reading into a client-allocated buffer where the read fails.
+TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const char* tmp_file = "/file/that/does/not/exist";
+ const int SCAN_LEN = 128;
+
+ scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
+
+ ASSERT_OK(io_mgr->Init(&mem_tracker));
+ // Reader doesn't need to provide mem tracker if it's providing buffers.
+ MemTracker* reader_mem_tracker = nullptr;
+ unique_ptr<RequestContext> reader;
+ vector<uint8_t> client_buffer(SCAN_LEN);
+ for (int i = 0; i < 1000; ++i) {
+ reader = io_mgr->RegisterContext(reader_mem_tracker);
+ ScanRange* range = AllocateRange();
+ range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
+ BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
+ ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+
+ /// Also test the cancellation path. Run multiple iterations since it is racy whether
+ /// the read fails before the cancellation.
+ if (i >= 1) io_mgr->CancelContext(reader.get());
+
+ unique_ptr<BufferDescriptor> io_buffer;
+ ASSERT_FALSE(range->GetNext(&io_buffer).ok());
+
+ // DiskIoMgr should not have allocated memory.
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+
+ io_mgr->UnregisterContext(reader.get());
+ }
+
+ pool_.Clear();
+ io_mgr.reset();
+ EXPECT_EQ(mem_tracker.consumption(), 0);
+}
+
+// Test to verify configuration parameters for number of I/O threads per disk.
+TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+ const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
+ + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
+
+ // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
+ // Since we do not have control over which disk is used, we check for either type
+ // (rotational/solid state)
+ MemTracker mem_tracker(LARGE_MEM_LIMIT);
+ const int num_io_threads_per_rotational_or_ssd = 2;
+ DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
+ num_io_threads_per_rotational_or_ssd, 1, 10);
+ ASSERT_OK(io_mgr.Init(&mem_tracker));
+ const int num_io_threads = io_mgr.disk_thread_group_.Size();
+ ASSERT_TRUE(num_io_threads ==
+ num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
+}
+}
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ return RUN_ALL_TESTS();
+}