You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:41:56 UTC
[05/15] impala git commit: IMPALA-6587: free buffers before
ScanRange::Cancel() returns
IMPALA-6587: free buffers before ScanRange::Cancel() returns
ScanRange::Cancel() now waits until an in-flight read finishes so
that the disk I/O buffer being processed by the disk thread is
freed when Cancel() returns.
The fix is to set a 'read_in_flight_' flag on the scan range
while the disk thread is doing the read. Cancel() blocks until
read_in_flight_ == false.
The code is refactored to move more logic into ScanRange and
to avoid holding RequestContext::lock_ for longer than necessary.
Testing:
Added query test that reproduces the issue.
Added a unit test and a stress option that reproduces the problem in a
targeted way.
Ran disk-io-mgr-stress test for a few hours. Ran it under TSAN and
inspected output to make sure there were no non-benign data races.
Change-Id: I87182b6bd51b5fb0b923e7e4c8d08a44e7617db2
Reviewed-on: http://gerrit.cloudera.org:8080/9680
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d7bba821
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d7bba821
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d7bba821
Branch: refs/heads/master
Commit: d7bba821923b96272c254840facd3d870b1ebe36
Parents: fb5dc9e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Feb 28 09:49:25 2018 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Sat Apr 28 23:41:39 2018 +0000
----------------------------------------------------------------------
be/src/common/global-flags.cc | 2 +
be/src/runtime/io/disk-io-mgr-internal.h | 9 +-
be/src/runtime/io/disk-io-mgr-stress.cc | 2 +
be/src/runtime/io/disk-io-mgr-test.cc | 49 +++++++
be/src/runtime/io/disk-io-mgr.cc | 100 +------------
be/src/runtime/io/disk-io-mgr.h | 19 +--
be/src/runtime/io/request-context.cc | 32 ++++-
be/src/runtime/io/request-context.h | 16 ++-
be/src/runtime/io/request-ranges.h | 79 ++++++----
be/src/runtime/io/scan-range.cc | 143 +++++++++++++++----
.../queries/QueryTest/scanners.test | 11 ++
11 files changed, 288 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ea88b28..d1a54e1 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -148,6 +148,8 @@ DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option th
DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects extra delay"
" in milliseconds when initializing an impalad's local catalog replica. Delay <= 0"
" inject no delay.");
+DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra delay"
+ " in milliseconds when the I/O manager is reading from disk.");
#endif
// Used for testing the path where the Kudu client is stubbed.
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 292530f..475456a 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -39,6 +39,8 @@
/// This file contains internal structures shared between submodules of the IoMgr. Users
/// of the IoMgr do not need to include this file.
+DECLARE_uint64(max_cached_file_handles);
+
// Macros to work around counters sometimes not being provided.
// TODO: fix things so that counters are always non-NULL.
#define COUNTER_ADD_IF_NOT_NULL(c, v) \
@@ -56,8 +58,13 @@
namespace impala {
namespace io {
+// Indicates if file handle caching should be used
+static inline bool is_file_handle_caching_enabled() {
+ return FLAGS_max_cached_file_handles > 0;
+}
+
/// Per disk state
-struct DiskIoMgr::DiskQueue {
+struct DiskQueue {
/// Disk id (0-based)
int disk_id;
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 3fd33de..fd360c4 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -247,6 +247,8 @@ void DiskIoMgrStress::NewClient(int i) {
// Clean up leftover state from the previous client (if any).
client.scan_ranges.clear();
ExecEnv* exec_env = ExecEnv::GetInstance();
+ if (client.reader != nullptr) io_mgr_->UnregisterContext(client.reader.get());
+
exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
client.obj_pool.Clear();
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 95b2d03..3e8a9ae 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -33,10 +33,12 @@
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
+#include "testutil/scoped-flag-setter.h"
#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/disk-info.h"
#include "util/thread.h"
+#include "util/time.h"
#include "common/names.h"
@@ -48,6 +50,9 @@ DECLARE_int64(min_buffer_size);
DECLARE_int32(num_remote_hdfs_io_threads);
DECLARE_int32(num_s3_io_threads);
DECLARE_int32(num_adls_io_threads);
+#ifndef NDEBUG
+DECLARE_int32(stress_disk_read_delay_ms);
+#endif
const int MIN_BUFFER_SIZE = 128;
const int MAX_BUFFER_SIZE = 1024;
@@ -1260,6 +1265,50 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
io_mgr.UnregisterContext(reader.get());
}
+// Regression test for IMPALA-6587 - all buffers should be released after Cancel().
+TEST_F(DiskIoMgrTest, CancelReleasesResources) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ int len = strlen(data);
+ const int64_t MIN_BUFFER_SIZE = 2;
+ const int64_t MAX_BUFFER_SIZE = 1024;
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ const int NUM_DISK_THREADS = 20;
+ DiskIoMgr io_mgr(
+ 1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+#ifndef NDEBUG
+ auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5);
+#endif
+
+ ASSERT_OK(io_mgr.Init());
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+
+ for (int i = 0; i < 10; ++i) {
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime);
+ bool needs_buffers;
+ ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
+ EXPECT_TRUE(needs_buffers);
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), &read_client, range, MAX_BUFFER_SIZE));
+ // Give disk I/O thread a chance to start read.
+ SleepForMs(1);
+
+ range->Cancel(Status::CANCELLED);
+ // Resources should be released immediately once Cancel() returns.
+ EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i;
+ }
+ buffer_pool()->DeregisterClient(&read_client);
+ io_mgr.UnregisterContext(reader.get());
+}
+
// Test reading into a client-allocated buffer.
TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 8933fec..50f6fc4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -129,13 +129,6 @@ DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used b
AtomicInt32 DiskIoMgr::next_disk_id_;
-namespace detail {
-// Indicates if file handle caching should be used
-static inline bool is_file_handle_caching_enabled() {
- return FLAGS_max_cached_file_handles > 0;
-}
-}
-
string DiskIoMgr::DebugString() {
stringstream ss;
ss << "Disks: " << endl;
@@ -549,7 +542,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
disk_queue->request_contexts.pop_front();
DCHECK(*request_context != nullptr);
request_disk_state = &((*request_context)->disk_states_[disk_id]);
- request_disk_state->IncrementDiskThreadAndDequeue();
+ request_disk_state->IncrementDiskThreadAfterDequeue();
}
// NOTE: no locks were taken in between. We need to be careful about what state
@@ -643,50 +636,6 @@ void DiskIoMgr::HandleWriteFinished(
}
}
-void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
- Status read_status, unique_ptr<BufferDescriptor> buffer) {
- unique_lock<mutex> reader_lock(reader->lock_);
-
- RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id];
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- DCHECK_GT(disk_state->num_threads_in_op(), 0);
- DCHECK(buffer->buffer_ != nullptr);
- DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path.";
-
- // After calling EnqueueReadyBuffer() below, it is no longer valid to read from buffer.
- // Store the state we need before calling EnqueueReadyBuffer().
- bool eosr = buffer->eosr_;
-
- // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is
- // called because all clients of DiskIoMgr keep ScanRange objects alive until they
- // unregister their RequestContext.
- ScanRange* scan_range = buffer->scan_range_;
- bool scan_range_done = eosr;
- if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
- DCHECK_EQ(reader->state_, RequestContext::Active);
- // Read successfully - update the reader's scan ranges. There are two cases here:
- // 1. End of scan range or cancelled scan range - don't need to reschedule.
- // 2. Middle of scan range - need to schedule to read next buffer.
- bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer));
- if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range);
- } else {
- // The scan range will be cancelled, either because we hit an error or because the
- // request context was cancelled. The buffer is not needed - we must free it.
- reader->FreeBuffer(buffer.get());
- // Propagate 'read_status' to the scan range. If we are here because the context
- // was cancelled, the scan range is already cancelled so we do not need to re-cancel
- // it.
- if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, read_status);
- scan_range_done = true;
- }
- if (scan_range_done) {
- scan_range->Close();
- --disk_state->num_remaining_ranges();
- }
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- disk_state->DecrementDiskThread(reader_lock, reader);
-}
-
void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
// The thread waits until there is work or the entire system is being shut down.
// If there is work, performs the read or write requested and re-enqueues the
@@ -707,7 +656,9 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
return;
}
if (range->request_type() == RequestType::READ) {
- ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
+ ScanRange* scan_range = static_cast<ScanRange*>(range);
+ ReadOutcome outcome = scan_range->DoRead(disk_queue->disk_id);
+ worker_context->ReadDone(disk_queue->disk_id, outcome, scan_range);
} else {
DCHECK(range->request_type() == RequestType::WRITE);
Write(worker_context, static_cast<WriteRange*>(range));
@@ -715,49 +666,6 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
}
}
-void DiskIoMgr::ReadRange(
- DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
- int64_t bytes_remaining = range->len_ - range->bytes_read_;
- DCHECK_GT(bytes_remaining, 0);
- unique_ptr<BufferDescriptor> buffer_desc;
- if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
- buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range,
- range->client_buffer_.data, range->client_buffer_.len));
- } else {
- DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
- << "This code path does not handle other buffer types, i.e. HDFS cache"
- << static_cast<int>(range->external_buffer_tag_);
- buffer_desc = range->GetNextUnusedBufferForRange();
- if (buffer_desc == nullptr) {
- // No buffer available - the range will be rescheduled when a buffer is added.
- unique_lock<mutex> reader_lock(reader->lock_);
- reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, reader);
- DCHECK(reader->Validate()) << endl << reader->DebugString();
- return;
- }
- }
-
- // No locks in this section. Only working on local vars. We don't want to hold a
- // lock across the read call.
- Status read_status = range->Open(detail::is_file_handle_caching_enabled());
- if (read_status.ok()) {
- // Update counters.
- COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
- COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id);
-
- read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
- &buffer_desc->len_, &buffer_desc->eosr_);
- buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
-
- COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_);
- COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
- COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L);
- }
-
- // Finished read, update reader/disk based on the results
- HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
-}
-
void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
Status ret_status = Status::OK();
FILE* file_handle = nullptr;
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 17aa211..b6b4b75 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -44,6 +44,8 @@
namespace impala {
namespace io {
+
+struct DiskQueue;
/// Manager object that schedules IO for all queries on all disks and remote filesystems
/// (such as S3). Each query maps to one or more RequestContext objects, each of which
/// has its own queue of scan ranges and/or write ranges.
@@ -393,7 +395,6 @@ class DiskIoMgr : public CacheLineAligned {
friend class RequestContext;
// TODO: remove io:: prefix - it is required for the "using ScanRange" workaround above.
friend class io::ScanRange;
- struct DiskQueue;
friend class DiskIoMgrTest_Buffers_Test;
friend class DiskIoMgrTest_BufferSizeSelection_Test;
@@ -448,8 +449,8 @@ class DiskIoMgr : public CacheLineAligned {
FileHandleCache file_handle_cache_;
/// Disk worker thread loop. This function retrieves the next range to process on
- /// the disk queue and invokes ReadRange() or Write() depending on the type of Range().
- /// There can be multiple threads per disk running this loop.
+ /// the disk queue and invokes ScanRange::DoRead() or Write() depending on the type
+ /// of Range. There can be multiple threads per disk running this loop.
void WorkLoop(DiskQueue* queue);
/// This is called from the disk thread to get the next range to process. It will
@@ -460,13 +461,6 @@ class DiskIoMgr : public CacheLineAligned {
bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
RequestContext** request_context);
- /// Updates disk queue and reader state after a read is complete. If the read
- /// was successful, 'read_status' is ok and 'buffer' contains the result of the
- /// read. If the read failed with an error, 'read_status' contains the error and
- /// 'buffer' has the buffer that was meant to hold the result of the read.
- void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
- Status read_status, std::unique_ptr<BufferDescriptor> buffer);
-
/// Invokes write_range->callback_ after the range has been written and
/// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
/// etc. is passed via write_status and to the callback.
@@ -488,11 +482,6 @@ class DiskIoMgr : public CacheLineAligned {
/// Does not open or close the file that is written.
Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT;
- /// Reads the specified scan range and calls HandleReadFinished() when done. If no
- /// buffer is available to read the range's data into, the read cannot proceed, the
- /// range becomes blocked and this function returns without doing I/O.
- void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range);
-
/// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range
/// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated.
std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes);
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index dec6aa6..dd8ff24 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -64,6 +64,34 @@ void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
buffer->buffer_ = nullptr;
}
+void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) {
+ // TODO: IMPALA-4249: it is safe to touch 'range' until DecrementDiskThread() is
+ // called because all clients of DiskIoMgr keep ScanRange objects alive until they
+ // unregister their RequestContext.
+ unique_lock<mutex> lock(lock_);
+ RequestContext::PerDiskState* disk_state = &disk_states_[disk_id];
+ DCHECK_GT(disk_state->num_threads_in_op(), 0);
+ if (outcome == ReadOutcome::SUCCESS_EOSR) {
+ // No more reads to do.
+ --disk_state->num_remaining_ranges();
+ } else if (outcome == ReadOutcome::SUCCESS_NO_EOSR) {
+ // Schedule the next read.
+ if (state_ != RequestContext::Cancelled) {
+ ScheduleScanRange(lock, range);
+ }
+ } else if (outcome == ReadOutcome::BLOCKED_ON_BUFFER) {
+ // Do nothing - the caller must add a buffer to the range or cancel it.
+ } else {
+ DCHECK(outcome == ReadOutcome::CANCELLED) << static_cast<int>(outcome);
+ // No more reads - clean up the scan range.
+ --disk_state->num_remaining_ranges();
+ RemoveActiveScanRangeLocked(lock, range);
+ }
+ // Release refcount that was taken in IncrementDiskThreadAfterDequeue().
+ disk_state->DecrementDiskThread(lock, this);
+ DCHECK(Validate()) << endl << DebugString();
+}
+
// Cancellation of a RequestContext requires coordination from multiple threads that may
// hold references to the context:
// 1. Disk threads that are currently processing a range for this context.
@@ -104,7 +132,9 @@ void RequestContext::Cancel() {
// Clear out all request ranges from queues for this reader. Cancel the scan
// ranges and invoke the write range callbacks to propagate the cancellation.
- for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED);
+ for (ScanRange* range : active_scan_ranges_) {
+ range->CancelInternal(Status::CANCELLED, false);
+ }
active_scan_ranges_.clear();
for (PerDiskState& disk_state : disk_states_) {
RequestRange* range;
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index b028596..adfe6d5 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -201,6 +201,11 @@ class RequestContext {
state.ScheduleContext(lock, this, range->disk_id());
}
+ /// Called from a disk thread when a read completes. Decrements the disk thread count
+ /// and other bookkeeping and re-schedules 'range' if there are more reads to do.
+ /// Caller must not hold 'lock_'.
+ void ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range);
+
/// Cancel the context if not already cancelled, wait for all scan ranges to finish
/// and mark the context as inactive, after which it cannot be used.
void CancelAndMarkInactive();
@@ -367,13 +372,14 @@ class RequestContext {
void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
RequestContext* context, int disk_id);
- /// Increment the count of disk threads that have a reference to this context. These
- /// threads do not hold any locks while reading from HDFS, so we need to prevent the
- /// RequestContext from being destroyed underneath them.
+ /// Called when dequeueing this RequestContext from the disk queue to increment the
+ /// count of disk threads with a reference to this context. These threads do not hold
+ /// any locks while reading from HDFS, so we need to prevent the RequestContext from
+ /// being destroyed underneath them.
///
/// The caller does not need to hold 'lock_', so this can execute concurrently with
/// itself and DecrementDiskThread().
- void IncrementDiskThreadAndDequeue() {
+ void IncrementDiskThreadAfterDequeue() {
/// Incrementing 'num_threads_in_op_' first so that there is no window when other
/// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
/// references left to this context.
@@ -395,7 +401,7 @@ class RequestContext {
}
// The state is cancelled, check to see if we're the last thread to touch the
// context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
- // in this order to avoid a race with IncrementDiskThreadAndDequeue().
+ // in this order to avoid a race with IncrementDiskThreadAfterDequeue().
if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
context->DecrementDiskRefCount(context_lock);
done_ = true;
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 7ec0cb6..3e266d6 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -34,8 +34,9 @@
namespace impala {
namespace io {
class DiskIoMgr;
-class RequestContext;
+struct DiskQueue;
class ExclusiveHdfsFileHandle;
+class RequestContext;
class ScanRange;
/// Buffer struct that is used by the caller and IoMgr to pass read buffers.
@@ -114,6 +115,20 @@ struct RequestType {
};
};
+/// ReadOutput describes the possible outcomes of the DoRead() function.
+enum class ReadOutcome {
+ // The last (eosr) buffer was successfully enqueued.
+ SUCCESS_EOSR,
+ // The buffer was successfully enqueued but we are not at eosr and can schedule
+ // the next read.
+ SUCCESS_NO_EOSR,
+ // The scan range is blocked waiting for the next buffer.
+ BLOCKED_ON_BUFFER,
+ // The scan range is cancelled (either by caller or because of an error). No more
+ // reads will be scheduled.
+ CANCELLED
+};
+
/// Represents a contiguous sequence of bytes in a single file.
/// This is the common base class for read and write IO requests - ScanRange and
/// WriteRange. Each disk thread processes exactly one RequestRange at a time.
@@ -236,12 +251,12 @@ class ScanRange : public RequestRange {
/// and cannot be accessed.
void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
- /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads
- /// blocked on GetNext(). Status is a non-ok status with the reason the range was
- /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or
- /// another error if an error was encountered while scanning the range. Status is
- /// returned to the any callers of GetNext(). If a thread is currently blocked in
- /// GetNext(), it is woken up.
+ /// Cancel this scan range. This waits for any in-flight read operations to complete,
+ /// cleans up all buffers owned by the scan range (i.e. queued or unused buffers)
+ /// and wakes up any threads blocked on GetNext(). Status is a non-ok status with the
+ /// reason the range was cancelled, e.g. CANCELLED if the range was cancelled because
+ /// it was not needed, or another error if an error was encountered while scanning the
+ /// range. Status is returned to the any callers of GetNext().
void Cancel(const Status& status);
/// return a descriptive string for debug.
@@ -259,10 +274,9 @@ class ScanRange : public RequestRange {
/// Enqueues a ready buffer with valid data for this range. This does not block.
/// The caller passes ownership of buffer to the scan range and it is not
- /// valid to access buffer after this call. The reader lock must be held by the
- /// caller. Returns false if the scan range was cancelled.
- bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
- std::unique_ptr<BufferDescriptor> buffer);
+ /// valid to access buffer after this call. Returns false if the scan range was
+ /// cancelled.
+ bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer);
/// Validates the internal state of this range. lock_ must be taken
/// before calling this.
@@ -324,10 +338,10 @@ class ScanRange : public RequestRange {
std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
const boost::unique_lock<boost::mutex>& scan_range_lock);
- /// Get the next buffer for this scan range for a disk thread to read into. Returns
- /// the new buffer if successful. If no buffers are available, marks the range
- /// as blocked and returns nullptr. Called must not hold 'lock_'.
- std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
+ /// Called from a disk I/O thread to read the next buffer of data for this range. The
+ /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the
+ /// ID of the disk queue. Caller must not hold 'lock_'.
+ ReadOutcome DoRead(int disk_id);
/// Cleans up a buffer that was not returned to the client.
/// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -344,15 +358,18 @@ class ScanRange : public RequestRange {
/// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
- /// Same as Cancel() except reader_->lock must be held by the caller.
- void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
- const Status& status);
-
/// Same as Cancel() except doesn't remove the scan range from
- /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(),
- /// which removes the range itself to avoid invalidating its active_scan_ranges_
- /// iterator.
- void CancelInternal(const Status& status);
+ /// reader_->active_scan_ranges_ or wait for in-flight reads to finish.
+ /// This is invoked by RequestContext::Cancel(), which removes the range itself
+ /// to avoid invalidating its active_scan_ranges_ iterator. If 'read_error' is
+ /// true, this is being called from a disk thread to propagate a read error, so
+ /// 'read_in_flight_' is set to false and threads in WaitForInFlightRead() are
+ /// woken up.
+ void CancelInternal(const Status& status, bool read_error);
+
+ /// Waits for any in-flight read to complete. Called after CancelInternal() to ensure
+ /// no more reads will occur for the scan range.
+ void WaitForInFlightRead();
/// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
void SetBlockedOnBuffer();
@@ -451,10 +468,15 @@ class ScanRange : public RequestRange {
/// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
int64_t unused_iomgr_buffer_bytes_ = 0;
- /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to
- /// infer how many bytes of buffers need to be held onto to read the rest of the scan
- /// range.
- int64_t iomgr_buffer_bytes_returned_ = 0;
+ /// Cumulative bytes of I/O mgr buffers taken from 'unused_iomgr_buffers_' by DoRead().
+ /// Used to infer how many bytes of buffers need to be held onto to read the rest of
+ /// the scan range.
+ int64_t iomgr_buffer_cumulative_bytes_used_ = 0;
+
+ /// True if a disk thread is currently doing a read for this scan range. Set to true in
+ /// DoRead() and set to false in EnqueueReadyBuffer() or CancelInternal() when the
+ /// read completes and any buffer used for the read is either enqueued or freed.
+ bool read_in_flight_ = false;
/// If true, the last buffer for this scan range has been queued.
/// If this is true and 'ready_buffers_' is empty, then no more buffers will be
@@ -472,7 +494,8 @@ class ScanRange : public RequestRange {
/// 'cancel_status_' is not OK.
std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
- /// Condition variable for threads in GetNext() that are waiting for the next buffer.
+ /// Condition variable for threads in GetNext() that are waiting for the next buffer
+ /// and threads in WaitForInFlightRead() that are waiting for a read to finish.
/// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is
/// cancelled.
ConditionVariable buffer_ready_cv_;
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 4f7c38b..c868c3d 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -36,23 +36,33 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRea
DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
"reading from ADLS.");
+#ifndef NDEBUG
+DECLARE_int32(stress_disk_read_delay_ms);
+#endif
+
// Implementation of the ScanRange functionality. Each ScanRange contains a queue
// of ready buffers. For each ScanRange, there is only a single producer and
// consumer thread, i.e. only one disk thread will push to a scan range at
// any time and only one thread will remove from the queue. This is to guarantee
// that buffers are queued and read in file order.
-
-bool ScanRange::EnqueueReadyBuffer(
- const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
- DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+bool ScanRange::EnqueueReadyBuffer(unique_ptr<BufferDescriptor> buffer) {
DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
DCHECK(!eosr_queued_);
+ if (!buffer->is_cached()) {
+ // All non-cached buffers are enqueued by disk threads. Indicate that the read
+ // finished.
+ DCHECK(read_in_flight_);
+ read_in_flight_ = false;
+ }
if (!cancel_status_.ok()) {
// This range has been cancelled, no need to enqueue the buffer.
CleanUpBuffer(scan_range_lock, move(buffer));
+ // One or more threads may be blocked in WaitForInFlightRead() waiting for the read
+ // to complete. Wake up all of them.
+ buffer_ready_cv_.NotifyAll();
return false;
}
// Clean up any surplus buffers. E.g. we may have allocated too many if the file was
@@ -122,7 +132,7 @@ void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
|| !cancel_status_.ok()
|| eosr_queued_
- || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) {
+ || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_cumulative_bytes_used_) {
CleanUpBuffer(scan_range_lock, move(buffer));
} else {
unused_iomgr_buffer_bytes_ += buffer->buffer_len();
@@ -156,15 +166,77 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
return result;
}
-unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
- unique_lock<mutex> lock(lock_);
- unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
- if (buffer_desc == nullptr) {
- blocked_on_buffer_ = true;
- } else {
- iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
+ReadOutcome ScanRange::DoRead(int disk_id) {
+ int64_t bytes_remaining = len_ - bytes_read_;
+ DCHECK_GT(bytes_remaining, 0);
+
+ unique_ptr<BufferDescriptor> buffer_desc;
+ {
+ unique_lock<mutex> lock(lock_);
+ DCHECK(!read_in_flight_);
+ if (!cancel_status_.ok()) return ReadOutcome::CANCELLED;
+
+ if (external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+ buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
+ io_mgr_, reader_, this, client_buffer_.data, client_buffer_.len));
+ } else {
+ DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+ << "This code path does not handle other buffer types, i.e. HDFS cache"
+ << static_cast<int>(external_buffer_tag_);
+ buffer_desc = GetUnusedBuffer(lock);
+ if (buffer_desc == nullptr) {
+ // No buffer available - the range will be rescheduled when a buffer is added.
+ blocked_on_buffer_ = true;
+ return ReadOutcome::BLOCKED_ON_BUFFER;
+ }
+ iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
+ }
+ read_in_flight_ = true;
}
- return buffer_desc;
+
+ // No locks in this section. Only working on local vars. We don't want to hold a
+ // lock across the read call.
+ Status read_status = Open(is_file_handle_caching_enabled());
+ if (read_status.ok()) {
+ COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
+ COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
+
+ read_status = Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+ &buffer_desc->len_, &buffer_desc->eosr_);
+ buffer_desc->scan_range_offset_ = bytes_read_ - buffer_desc->len_;
+
+ COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
+ COUNTER_ADD(&io_mgr_->total_bytes_read_counter_, buffer_desc->len_);
+ COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+ }
+
+ DCHECK(buffer_desc->buffer_ != nullptr);
+ DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path.";
+ if (!read_status.ok()) {
+ // Free buffer to release resources before we cancel the range so that all buffers
+ // are freed at cancellation.
+ reader_->FreeBuffer(buffer_desc.get());
+ buffer_desc.reset();
+
+ // Propagate 'read_status' to the scan range. This will also wake up any waiting
+ // threads.
+ CancelInternal(read_status, true);
+ // No more reads for this scan range - we can close it.
+ Close();
+ return ReadOutcome::CANCELLED;
+ }
+
+ // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
+ // Store the state we need before calling EnqueueReadyBuffer().
+ bool eosr = buffer_desc->eosr_;
+ // Read successful - enqueue the buffer and return the appropriate outcome.
+ if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
+ if (eosr) {
+ // No more reads for this scan range - we can close it.
+ Close();
+ return ReadOutcome::SUCCESS_EOSR;
+ }
+ return ReadOutcome::SUCCESS_NO_EOSR;
}
void ScanRange::SetBlockedOnBuffer() {
@@ -202,18 +274,14 @@ void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock)
void ScanRange::Cancel(const Status& status) {
// Cancelling a range that was never started, ignore.
if (io_mgr_ == nullptr) return;
- CancelInternal(status);
+ CancelInternal(status, false);
+ // Wait until an in-flight read is finished. The read thread will clean up the
+ // buffer it used. Once the range is cancelled, no more reads should be started.
+ WaitForInFlightRead();
reader_->RemoveActiveScanRange(this);
}
-void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
- const Status& status) {
- DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
- CancelInternal(status);
- reader_->RemoveActiveScanRangeLocked(reader_lock, this);
-}
-
-void ScanRange::CancelInternal(const Status& status) {
+void ScanRange::CancelInternal(const Status& status, bool read_error) {
DCHECK(io_mgr_ != nullptr);
DCHECK(!status.ok());
{
@@ -223,10 +291,10 @@ void ScanRange::CancelInternal(const Status& status) {
{
unique_lock<mutex> hdfs_lock(hdfs_lock_);
DCHECK(Validate()) << DebugString();
- // If already cancelled, preserve the original reason for cancellation. The first
- // thread to set 'cancel_status_' does the cleanup below.
- RETURN_VOID_IF_ERROR(cancel_status_);
- cancel_status_ = status;
+ // If already cancelled, preserve the original reason for cancellation. Most of the
+ // cleanup is not required if already cancelled, but we need to set
+ // 'read_in_flight_' to false.
+ if (cancel_status_.ok()) cancel_status_ = status;
}
/// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
@@ -238,6 +306,10 @@ void ScanRange::CancelInternal(const Status& status) {
/// Clean up buffers that we don't need any more because we won't read any more data.
CleanUpUnusedBuffers(scan_range_lock);
+ if (read_error) {
+ DCHECK(read_in_flight_);
+ read_in_flight_ = false;
+ }
}
buffer_ready_cv_.NotifyAll();
@@ -246,6 +318,11 @@ void ScanRange::CancelInternal(const Status& status) {
if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
}
+void ScanRange::WaitForInFlightRead() {
+ unique_lock<mutex> scan_range_lock(lock_);
+ while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
+}
+
string ScanRange::DebugString() const {
stringstream ss;
ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
@@ -303,6 +380,7 @@ ScanRange::~ScanRange() {
DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
<< "Cached buffer was not released.";
+ DCHECK(!read_in_flight_);
DCHECK_EQ(0, ready_buffers_.size());
DCHECK_EQ(0, num_buffers_in_reader_.Load());
}
@@ -310,6 +388,7 @@ ScanRange::~ScanRange() {
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
DCHECK(ready_buffers_.empty());
+ DCHECK(!read_in_flight_);
DCHECK(file != nullptr);
DCHECK_GE(len, 0);
DCHECK_GE(offset, 0);
@@ -340,13 +419,14 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
DCHECK(exclusive_hdfs_fh_ == nullptr);
DCHECK(local_file_ == nullptr);
+ DCHECK(!read_in_flight_);
io_mgr_ = io_mgr;
reader_ = reader;
local_file_ = nullptr;
exclusive_hdfs_fh_ = nullptr;
bytes_read_ = 0;
unused_iomgr_buffer_bytes_ = 0;
- iomgr_buffer_bytes_returned_ = 0;
+ iomgr_buffer_cumulative_bytes_used_ = 0;
cancel_status_ = Status::OK();
eosr_queued_ = false;
blocked_on_buffer_ = false;
@@ -474,6 +554,13 @@ int64_t ScanRange::MaxReadChunkSize() const {
// TODO: look at linux disk scheduling
Status ScanRange::Read(
uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
+ DCHECK(read_in_flight_);
+ // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
+#ifndef NDEBUG
+ if (FLAGS_stress_disk_read_delay_ms > 0) {
+ SleepForMs(FLAGS_stress_disk_read_delay_ms);
+ }
+#endif
unique_lock<mutex> hdfs_lock(hdfs_lock_);
RETURN_IF_ERROR(cancel_status_);
@@ -664,7 +751,7 @@ Status ScanRange::ReadFromCache(
desc->scan_range_offset_ = 0;
desc->eosr_ = true;
bytes_read_ = bytes_read;
- EnqueueReadyBuffer(reader_lock, move(desc));
+ EnqueueReadyBuffer(move(desc));
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
*read_succeeded = true;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/testdata/workloads/functional-query/queries/QueryTest/scanners.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index a000a15..b05786e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -117,3 +117,14 @@ select * from emptytable;
---- TYPES
STRING,INT
====
+---- QUERY
+# IMPALA-6587: regression test for reservation not being managed correctly. Should be
+# able to execute this query reliably with the minimum reservation, even with tiny
+# scan ranges. This reliably reproduced the issue when run against text/lzo.
+set max_scan_range_length=1;
+select count(*) from alltypessmall
+---- RESULTS
+100
+---- TYPES
+BIGINT
+====