You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/09/27 09:51:50 UTC

[impala] 07/07: IMPALA-10714: Defer advancing read page until the buffer is attached

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.0.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 80a1dc33d33538af609a0f1dd761363a06d279b8
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Thu Sep 16 15:12:22 2021 -0700

    IMPALA-10714: Defer advancing read page until the buffer is attached
    
    If a BufferedTupleStream is a read-write stream and set up with
    attach_on_read = true, BufferedTupleStream::NextReadPage() expects that
    the page buffer is attached to the output row batch before advancing the
    read iterator. However, BufferedTupleStream::GetNextInternal() will not
    attach the page buffer of a fully read page if it is a read-write page.
    
    Consider the following scenario:
    1. Only 1 page left in stream. This is a read-write page.
    2. GetNext() has fully read the page, but does NOT attach the buffer to
       output row batch because it is a read-write page.
    3. Stream writer insert more rows, but the read-write page can not fit
       any more rows. Therefore, new pages are created.
    4. Stream writer call UnpinStream().
    5. UnpinStream() call NextReadPage(), which in turn will fail the
       assertion "read_iter->read_page_->attached_to_output_batch".
    
    BufferedTupleStream::UnpinStream() need to defer advancing the read page
    if this situation happens.
    
    This patch adds BE test StreamStateTest.UnpinFullyExhaustedReadPage that
    simulates the corner case. This patch also moves BE test
    DeferAdvancingReadPage and ShortDebugString into class StreamStateTest
    to reduce friend class declaration in buffered-tuple-stream.h
    
    Testing:
    - Run and pass BE test StreamStateTest.UnpinFullyExhaustedReadPage.
    
    Change-Id: I586ed72ba01cc3f28b0dcb1e202b3ca32a6c3b83
    Reviewed-on: http://gerrit.cloudera.org:8080/17853
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/buffered-tuple-stream-test.cc | 229 ++++++++++++++++++++++++++-
 be/src/runtime/buffered-tuple-stream.cc      |  30 ++--
 be/src/runtime/buffered-tuple-stream.h       |   4 +-
 3 files changed, 244 insertions(+), 19 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index dfdfeba..ad34f5e 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -569,6 +569,38 @@ class ArrayTupleStreamTest : public SimpleTupleStreamTest {
   }
 };
 
+/// Test internal stream state under certain corner cases.
+class StreamStateTest : public SimpleTupleStreamTest {
+ protected:
+  // Test that UnpinStream defers advancing the read page when all rows from the read
+  // page are attached to a returned RowBatch but got not enough reservation.
+  void TestDeferAdvancingReadPage();
+
+  // Test unpinning a read-write stream when the read page has been fully exhausted but
+  // its buffer is not attached yet to the output row batch.
+  void TestUnpinAfterFullStreamRead(
+      bool read_write, bool attach_on_read, bool refill_before_unpin);
+
+  // Fill up the stream by repeatedly inserting write_batch into the stream until it is
+  // full. Return number of rows successfully inserted into the stream.
+  // Stream must be in pinned mode.
+  Status FillUpStream(
+      BufferedTupleStream* stream, RowBatch* write_batch, int64_t& num_inserted);
+
+  // Read out the stream until eos is reached. Return number of rows successfully read.
+  Status ReadOutStream(
+      BufferedTupleStream* stream, RowBatch* read_batch, int64_t& num_read);
+
+  // Verify that page count, pinned bytes, and unpinned bytes of the stream match the
+  // expectation.
+  void VerifyStreamState(BufferedTupleStream* stream, int num_page, int num_pinned_page,
+      int num_unpinned_page, int buffer_size);
+
+  // Test that stream's debug string is capped only for the first
+  // BufferedTupleStream::MAX_PAGE_ITER_DEBUG.
+  void TestShortDebugString();
+};
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleTupleStreamTest, Basic) {
   Init(numeric_limits<int64_t>::max());
@@ -1297,9 +1329,7 @@ TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
   write_batch->Reset();
 }
 
-// Test that UnpinStream defer advancing the read page when all rows from the read page
-// are attached to a returned RowBatch but got not enough reservation.
-TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
+void StreamStateTest::TestDeferAdvancingReadPage() {
   int num_rows = 1024;
   int buffer_size = 4 * 1024;
   // Only give 2 * buffer_size for the stream initial read and write page reservation.
@@ -1315,7 +1345,7 @@ TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
     // and the output batch has NOT been reset.
     BufferedTupleStream stream(
         runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
-    ASSERT_OK(stream.Init("SimpleTupleStreamTest::DeferAdvancingReadPage", true));
+    ASSERT_OK(stream.Init("StreamStateTest::DeferAdvancingReadPage", true));
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
     ASSERT_TRUE(got_reservation);
 
@@ -1367,6 +1397,126 @@ TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
   write_batch->Reset();
 }
 
+void StreamStateTest::TestUnpinAfterFullStreamRead(
+    bool read_write, bool attach_on_read, bool refill_before_unpin) {
+  DCHECK(read_write || !refill_before_unpin)
+      << "Only read-write stream support refilling stream after full read.";
+
+  int num_rows = 1024;
+  int buffer_size = 4 * 1024;
+  int max_num_pages = 4;
+  Init(max_num_pages * buffer_size);
+
+  bool got_reservation;
+  RowBatch* write_batch = CreateIntBatch(0, num_rows, false);
+
+  {
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("StreamStateTest::TestUnpinAfterFullStreamRead", true));
+    if (read_write) {
+      ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
+    } else {
+      ASSERT_OK(stream.PrepareForWrite(&got_reservation));
+    }
+    ASSERT_TRUE(got_reservation);
+    RowBatch read_batch(int_desc_, num_rows, &tracker_);
+    int64_t num_rows_written = 0;
+    int64_t num_rows_read = 0;
+
+    // Add rows into the stream until the stream is full.
+    ASSERT_OK(FillUpStream(&stream, write_batch, num_rows_written));
+    int num_pages = max_num_pages;
+    ASSERT_EQ(stream.pages_.size(), num_pages);
+    ASSERT_FALSE(stream.has_read_write_page());
+
+    // Read the entire rows out of the stream.
+    if (!read_write) {
+      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_reservation));
+      ASSERT_TRUE(got_reservation);
+    }
+    ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+    if (attach_on_read) num_pages = 1;
+    ASSERT_EQ(stream.pages_.size(), num_pages);
+    ASSERT_EQ(stream.has_read_write_page(), read_write);
+
+    if (read_write && refill_before_unpin) {
+      // Fill the stream until it is full again.
+      ASSERT_OK(FillUpStream(&stream, write_batch, num_rows_written));
+      num_pages = max_num_pages;
+      ASSERT_EQ(stream.pages_.size(), num_pages);
+      ASSERT_EQ(stream.has_read_write_page(), !attach_on_read);
+    }
+
+    // Verify that the read page has been fully read before unpinning the stream.
+    ASSERT_EQ(
+        stream.read_it_.read_page_rows_returned_, stream.read_it_.read_page_->num_rows);
+    // read_page_ should NOT be attached to output batch unless stream is in read-only and
+    // attach_on_read mode.
+    bool attached = !read_write && attach_on_read;
+    ASSERT_EQ(stream.read_it_.read_page_->attached_to_output_batch, attached);
+
+    // Verify stream state before UnpinStream.
+    int num_pinned_pages = num_pages;
+    ASSERT_TRUE(stream.is_pinned());
+    if (attached) {
+      // In a pinned + read-only + attach_on_read stream, a fully exhausted read page is
+      // automatically unpinned and destroyed, but not yet removed from stream.pages_
+      // until the next GetNext() or UnpinStream() call.
+      ASSERT_EQ(stream.pages_.size(), 1);
+      num_pinned_pages = 0;
+    }
+    VerifyStreamState(&stream, num_pages, num_pinned_pages, 0, buffer_size);
+
+    // Unpin the stream.
+    ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    ASSERT_FALSE(stream.is_pinned());
+
+    // Verify stream state after UnpinStream. num_pages should remain unchanged after
+    // UnpinStream() except for the case of read-only + attach_on_read stream.
+    if (read_write) {
+      if (attach_on_read) {
+        if (refill_before_unpin) {
+          num_pinned_pages = 2;
+          ASSERT_TRUE(stream.pages_.begin()->is_pinned());
+          ASSERT_TRUE(stream.pages_.back().is_pinned());
+        } else {
+          num_pinned_pages = 1;
+          ASSERT_TRUE(stream.pages_.back().is_pinned());
+        }
+      } else {
+        num_pinned_pages = 1;
+        ASSERT_TRUE(stream.pages_.back().is_pinned());
+      }
+    } else {
+      if (attach_on_read) {
+        num_pages = 0;
+      }
+      num_pinned_pages = 0;
+    }
+    int num_unpinned_pages = num_pages - num_pinned_pages;
+    VerifyStreamState(
+        &stream, num_pages, num_pinned_pages, num_unpinned_pages, buffer_size);
+
+    if (read_write) {
+      // Additionally, test that write and read operation still work in read-write
+      // stream after UnpinStream.
+      Status status;
+      ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+      for (int i = 0; i < write_batch->num_rows(); ++i) {
+        EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+        ASSERT_OK(status);
+      }
+      ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+      ASSERT_EQ(write_batch->num_rows(), num_rows_read);
+    }
+
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+    read_batch.Reset();
+  }
+  write_batch->Reset();
+}
+
 // Test writing to a stream (AddRow and UnpinStream), even though attached pages have not
 // been released yet.
 TEST_F(SimpleTupleStreamTest, WriteAfterReadAttached) {
@@ -1527,7 +1677,7 @@ TEST_F(SimpleTupleStreamTest, ConcurrentReaders) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
-TEST_F(SimpleTupleStreamTest, ShortDebugString) {
+void StreamStateTest::TestShortDebugString() {
   Init(BUFFER_POOL_LIMIT);
 
   int num_batches = 50;
@@ -1539,7 +1689,7 @@ TEST_F(SimpleTupleStreamTest, ShortDebugString) {
 
   BufferedTupleStream stream(
       runtime_state_, desc, &client_, default_page_len, max_page_len);
-  ASSERT_OK(stream.Init("SimpleTupleStreamTest::ShortDebugString", true));
+  ASSERT_OK(stream.Init("StreamStateTest::ShortDebugString", true));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
   ASSERT_TRUE(got_write_reservation);
@@ -2022,6 +2172,73 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
+
+Status StreamStateTest::FillUpStream(
+    BufferedTupleStream* stream, RowBatch* write_batch, int64_t& num_inserted) {
+  DCHECK(stream->is_pinned());
+  int64_t idx = 0;
+  Status status;
+  num_inserted = 0;
+  while (stream->AddRow(write_batch->GetRow(idx), &status)) {
+    RETURN_IF_ERROR(status);
+    idx = (idx + 1) % write_batch->num_rows();
+    num_inserted++;
+  }
+  return status;
+}
+
+Status StreamStateTest::ReadOutStream(
+    BufferedTupleStream* stream, RowBatch* read_batch, int64_t& num_read) {
+  bool eos = false;
+  num_read = 0;
+  do {
+    read_batch->Reset();
+    RETURN_IF_ERROR(stream->GetNext(read_batch, &eos));
+    num_read += read_batch->num_rows();
+  } while (!eos);
+  return Status::OK();
+}
+
+void StreamStateTest::VerifyStreamState(BufferedTupleStream* stream, int num_page,
+    int num_pinned_page, int num_unpinned_page, int buffer_size) {
+  ASSERT_EQ(stream->pages_.size(), num_page);
+  ASSERT_EQ(stream->num_pages_, num_page);
+  ASSERT_EQ(stream->BytesPinned(false), buffer_size * num_pinned_page);
+  ASSERT_EQ(stream->bytes_unpinned(), buffer_size * num_unpinned_page);
+  stream->CheckConsistencyFull(stream->read_it_);
+}
+
+TEST_F(StreamStateTest, DeferAdvancingReadPage) {
+  TestDeferAdvancingReadPage();
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadWriteStreamNoAttachRefill) {
+  TestUnpinAfterFullStreamRead(true, false, true);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadWriteStreamNoAttachNoRefill) {
+  TestUnpinAfterFullStreamRead(true, false, false);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadWriteStreamAttachRefill) {
+  TestUnpinAfterFullStreamRead(true, true, true);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadWriteStreamAttachNoRefill) {
+  TestUnpinAfterFullStreamRead(true, true, false);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadOnlyStreamAttach) {
+  TestUnpinAfterFullStreamRead(false, true, false);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadOnlyStreamNoAttach) {
+  TestUnpinAfterFullStreamRead(false, false, false);
+}
+
+TEST_F(StreamStateTest, ShortDebugString) {
+  TestShortDebugString();
+}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 01edc84..4330696 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -719,16 +719,24 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
     bool defer_advancing_read_page = false;
     if (&*read_it_.read_page_ != write_page_ && read_it_.read_page_ != pages_.end()
         && read_it_.read_page_rows_returned_ == read_it_.read_page_->num_rows) {
-      if (read_it_.read_page_->attached_to_output_batch) {
-        if (num_pages_ <= 2) {
-          // NextReadPage will attempt to save default_page_len_ into write reservation if
-          // the stream ended up with only 1 read/write page after advancing the read
-          // page. This can potentially lead to negative unused reservation if the reader
-          // has not freed the row batch where the read page buffer is attached to. We
-          // defer advancing the read page until the next GetNext() call by the reader
-          // (see IMPALA-10584).
-          defer_advancing_read_page = true;
-        }
+      if (has_write_iterator_ && read_it_.attach_on_read_
+          && (num_pages_ <= 2 || !read_it_.read_page_->attached_to_output_batch)) {
+        // In a read-write stream + attach_on_read mode, there are cases where we should
+        // NOT advance the read page even though the page has been fully exhausted:
+        //
+        // 1. Stream has exactly 2 pages: 1 read and 1 write.
+        //    NextReadPage() will attempt to save default_page_len_ into write
+        //    reservation if the stream ended up with only 1 read/write page after
+        //    advancing the read page. This can potentially lead to negative unused
+        //    reservation if the reader has not freed the row batch where the read page
+        //    buffer is attached to (see IMPALA-10584).
+        // 2. Read page buffer has not been attached yet to the output row batch.
+        //    The previous GetNext() would not attach the read page buffer to the output
+        //    row batch if it was a read-write page (see IMPALA-10714).
+        //
+        // We defer advancing the read page for these cases until the next GetNext()
+        // call by the reader.
+        defer_advancing_read_page = true;
       }
 
       if (!defer_advancing_read_page) {
@@ -742,7 +750,7 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
     std::list<Page>::iterator it = pages_.begin();
     if (defer_advancing_read_page) {
       // We skip advancing the read page earlier, so the first page must be a read page
-      // and attached_to_output_batch is true. We should keep the first page pinned. The
+      // and the reader has not done reading it. We should keep the first page pinned. The
       // next GetNext() call is the one who will be responsible to unpin the first page.
       DCHECK(read_it_.read_page_ == pages_.begin());
       ++it;
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index bba6479..8546414 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -484,6 +484,7 @@ class BufferedTupleStream {
 
    private:
     friend class BufferedTupleStream;
+    friend class StreamStateTest;
 
     /// True if the read iterator is currently valid
     bool valid_ = false;
@@ -560,8 +561,7 @@ class BufferedTupleStream {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
-  friend class SimpleTupleStreamTest_ShortDebugString_Test;
-  friend class SimpleTupleStreamTest_DeferAdvancingReadPage_Test;
+  friend class StreamStateTest;
 
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;