You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/22 02:56:06 UTC
impala git commit: IMPALA-7402: fix DCHECK when releasing reservation
in scan
Repository: impala
Updated Branches:
refs/heads/master 3ff4cde77 -> bdd904922
IMPALA-7402: fix DCHECK when releasing reservation in scan
The bug is that ScannerContext::Stream::GetNextBuffer(), when
reading past the end of a scan range and ScanRange::GetNext()
returned cancelled, did not wait for buffers owned by the
scan range to be freed. Subsequent code assumes that all
buffers allocated by the scanner are freed after
HdfsScanner::Close() returns, but this was not guaranteed.
The fix is to strengthen the post-condition of ScanRange::GetNext() so
that buffers are guaranteed to be returned when GetNext() returns
CANCELLED.
Testing:
Added a unit test that tests the new invariant.
Manually tested that this fixed the regression by inserting a 10ms sleep
in BufferPool::FreeBuffer() and looping the test that failed.
Ran DiskIoMgrStressTest overnight and ran core tests.
Change-Id: I445d306de0c6bfb71359100de2fdf3cd4326f6d9
Reviewed-on: http://gerrit.cloudera.org:8080/11283
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bdd90492
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bdd90492
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bdd90492
Branch: refs/heads/master
Commit: bdd904922a220c37326928ac674779acaef5f6fa
Parents: 3ff4cde
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 20 20:00:10 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 02:22:50 2018 +0000
----------------------------------------------------------------------
be/src/runtime/io/disk-io-mgr-test.cc | 52 ++++++++++++++++++++++++++++++
be/src/runtime/io/request-ranges.h | 3 +-
be/src/runtime/io/scan-range.cc | 14 ++++----
3 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 3d89d04..6c84e19 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -1307,6 +1307,58 @@ TEST_F(DiskIoMgrTest, CancelReleasesResources) {
io_mgr.UnregisterContext(reader.get());
}
+// Regression test for IMPALA-7402 - RequestContext::Cancel() propagation via
+// ScanRange::GetNext() does not guarantee buffers are released.
+TEST_F(DiskIoMgrTest, FinalGetNextReleasesResources) {
+ InitRootReservation(LARGE_RESERVATION_LIMIT);
+ const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+ const char* data = "the quick brown fox jumped over the lazy dog";
+ int len = strlen(data);
+ const int64_t MIN_BUFFER_SIZE = 2;
+ const int64_t MAX_BUFFER_SIZE = 1024;
+ CreateTempFile(tmp_file, data);
+
+ // Get mtime for file
+ struct stat stat_val;
+ stat(tmp_file, &stat_val);
+
+ const int NUM_DISK_THREADS = 20;
+ DiskIoMgr io_mgr(
+ 1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+#ifndef NDEBUG
+ auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5);
+#endif
+
+ ASSERT_OK(io_mgr.Init());
+ BufferPool::ClientHandle read_client;
+ RegisterBufferPoolClient(
+ LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+
+ for (int i = 0; i < 10; ++i) {
+ unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+ ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime);
+ bool needs_buffers;
+ ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
+ EXPECT_TRUE(needs_buffers);
+ ASSERT_OK(io_mgr.AllocateBuffersForRange(&read_client, range, MAX_BUFFER_SIZE));
+ // Give disk I/O thread a chance to start read.
+ SleepForMs(1);
+
+ reader->Cancel();
+ // The scan range should hold no resources once ScanRange::GetNext() returns.
+ unique_ptr<BufferDescriptor> buffer;
+ Status status = range->GetNext(&buffer);
+ if (status.ok()) {
+ DCHECK(buffer->eosr());
+ range->ReturnBuffer(move(buffer));
+ }
+ EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i << ": "
+ << status.GetDetail();
+ io_mgr.UnregisterContext(reader.get());
+ }
+ buffer_pool()->DeregisterClient(&read_client);
+}
+
// Test reading into a client-allocated buffer.
TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 22c6c36..95e832d 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -242,7 +242,8 @@ class ScanRange : public RequestRange {
/// Returns the next buffer for this scan range. buffer is an output parameter.
/// This function blocks until a buffer is ready or an error occurred. If this is
/// called when all buffers have been returned, *buffer is set to nullptr and Status::OK
- /// is returned.
+ /// is returned. If this returns buffer->eos() or an error status, then all buffers
+ /// owned by the scan range were either returned to callers of GetNext() or freed.
/// Only one thread can be in GetNext() at any time.
Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 6028fb4..12037ca 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -82,14 +82,16 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
- // No more buffers to return - return the cancel status or OK if not cancelled.
- if (all_buffers_returned(scan_range_lock)) return cancel_status_;
-
- while (ready_buffers_.empty() && cancel_status_.ok()) {
+ while (!all_buffers_returned(scan_range_lock) && ready_buffers_.empty()) {
buffer_ready_cv_.Wait(scan_range_lock);
}
- /// Propagate cancellation to the client if it happened while we were waiting.
- RETURN_IF_ERROR(cancel_status_);
+ // No more buffers to return - return the cancel status or OK if not cancelled.
+ if (all_buffers_returned(scan_range_lock)) {
+ // Wait until read finishes to ensure buffers are freed.
+ while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
+ DCHECK_EQ(0, ready_buffers_.size());
+ return cancel_status_;
+ }
// Remove the first ready buffer from the queue and return it
DCHECK(!ready_buffers_.empty());