You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/22 02:56:06 UTC

impala git commit: IMPALA-7402: fix DCHECK when releasing reservation in scan

Repository: impala
Updated Branches:
  refs/heads/master 3ff4cde77 -> bdd904922


IMPALA-7402: fix DCHECK when releasing reservation in scan

The bug is that ScannerContext::Stream::GetNextBuffer(), when
reading past the end of a scan range and ScanRange::GetNext()
returned cancelled, did not wait for buffers owned by the
scan range to be freed. Subsequent code assumes that all
buffers allocated by the scanner are freed after
HdfsScanner::Close() returns, but this was not guaranteed.

The fix is to strengthen the post-condition of ScanRange::GetNext() so
that buffers are guaranteed to be returned when GetNext() returns
CANCELLED.

Testing:
Added a unit test that tests the new invariant.

Manually tested that this fixed the regression by inserting a 10ms sleep
in BufferPool::FreeBuffer() and looping the test that failed.

Ran DiskIoMgrStressTest overnight and ran core tests.

Change-Id: I445d306de0c6bfb71359100de2fdf3cd4326f6d9
Reviewed-on: http://gerrit.cloudera.org:8080/11283
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bdd90492
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bdd90492
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bdd90492

Branch: refs/heads/master
Commit: bdd904922a220c37326928ac674779acaef5f6fa
Parents: 3ff4cde
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 20 20:00:10 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 02:22:50 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/disk-io-mgr-test.cc | 52 ++++++++++++++++++++++++++++++
 be/src/runtime/io/request-ranges.h    |  3 +-
 be/src/runtime/io/scan-range.cc       | 14 ++++----
 3 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 3d89d04..6c84e19 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -1307,6 +1307,58 @@ TEST_F(DiskIoMgrTest, CancelReleasesResources) {
   io_mgr.UnregisterContext(reader.get());
 }
 
+// Regression test for IMPALA-7402 - RequestContext::Cancel() propagation via
+// ScanRange::GetNext() does not guarantee buffers are released.
+TEST_F(DiskIoMgrTest, FinalGetNextReleasesResources) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  int len = strlen(data);
+  const int64_t MIN_BUFFER_SIZE = 2;
+  const int64_t MAX_BUFFER_SIZE = 1024;
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  const int NUM_DISK_THREADS = 20;
+  DiskIoMgr io_mgr(
+      1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+#ifndef NDEBUG
+  auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5);
+#endif
+
+  ASSERT_OK(io_mgr.Init());
+  BufferPool::ClientHandle read_client;
+  RegisterBufferPoolClient(
+      LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+
+  for (int i = 0; i < 10; ++i) {
+    unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+    ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime);
+    bool needs_buffers;
+    ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
+    EXPECT_TRUE(needs_buffers);
+    ASSERT_OK(io_mgr.AllocateBuffersForRange(&read_client, range, MAX_BUFFER_SIZE));
+    // Give disk I/O thread a chance to start read.
+    SleepForMs(1);
+
+    reader->Cancel();
+    // The scan range should hold no resources once ScanRange::GetNext() returns.
+    unique_ptr<BufferDescriptor> buffer;
+    Status status = range->GetNext(&buffer);
+    if (status.ok()) {
+      DCHECK(buffer->eosr());
+      range->ReturnBuffer(move(buffer));
+    }
+    EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i << ": "
+                                                   << status.GetDetail();
+    io_mgr.UnregisterContext(reader.get());
+  }
+  buffer_pool()->DeregisterClient(&read_client);
+}
+
 // Test reading into a client-allocated buffer.
 TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 22c6c36..95e832d 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -242,7 +242,8 @@ class ScanRange : public RequestRange {
   /// Returns the next buffer for this scan range. buffer is an output parameter.
   /// This function blocks until a buffer is ready or an error occurred. If this is
   /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK
-  /// is returned.
+  /// is returned. If this returns buffer->eos() or an error status, then all buffers
+  /// owned by the scan range were either returned to callers of GetNext() or freed.
   /// Only one thread can be in GetNext() at any time.
   Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 6028fb4..12037ca 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -82,14 +82,16 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
-    // No more buffers to return - return the cancel status or OK if not cancelled.
-    if (all_buffers_returned(scan_range_lock)) return cancel_status_;
-
-    while (ready_buffers_.empty() && cancel_status_.ok()) {
+    while (!all_buffers_returned(scan_range_lock) && ready_buffers_.empty()) {
       buffer_ready_cv_.Wait(scan_range_lock);
     }
-    /// Propagate cancellation to the client if it happened while we were waiting.
-    RETURN_IF_ERROR(cancel_status_);
+    // No more buffers to return - return the cancel status or OK if not cancelled.
+    if (all_buffers_returned(scan_range_lock)) {
+      // Wait until read finishes to ensure buffers are freed.
+      while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
+      DCHECK_EQ(0, ready_buffers_.size());
+      return cancel_status_;
+    }
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());