You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/20 23:12:13 UTC

[2/2] incubator-impala git commit: IMPALA-5676: avoid expensive consistency checks in BTSv2

IMPALA-5676: avoid expensive consistency checks in BTSv2

Doing an O(n) consistency check every time the read or write
page was advanced results in O(n^2) overall runtime.

The fix is to separate the O(1) and O(n) checks and only do the
O(n) checks if:
* The function does an an O(n) pass over the pages anyway (e.g.
  PinStream())
* The function is called only once per read or write pass over the
  stream.

This should make the cost of the checks O(n) (if we make the reasonable
assumption that PrepareForWrite(), PrepareForRead(), PinStream() and
UnpinStream() are called a bounded number of times per stream).

Testing:
Ran BufferedTupleStreamV2Test.

Change-Id: I8b380fcd0568cb73b36a490954bcd316db969ede
Reviewed-on: http://gerrit.cloudera.org:8080/7459
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9d1e4449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9d1e4449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9d1e4449

Branch: refs/heads/master
Commit: 9d1e4449cdf3f83dda6d87ebbb7bb4a6c854ce0b
Parents: 9f2f065
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jul 18 16:19:42 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jul 20 22:53:25 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2.cc | 64 +++++++++++++++----------
 be/src/runtime/buffered-tuple-stream-v2.h  |  9 +++-
 2 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
index faa9008..2153264 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -36,9 +36,11 @@
 #include "common/names.h"
 
 #ifdef NDEBUG
-#define CHECK_CONSISTENCY()
+#define CHECK_CONSISTENCY_FAST()
+#define CHECK_CONSISTENCY_FULL()
 #else
-#define CHECK_CONSISTENCY() CheckConsistency()
+#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast()
+#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull()
 #endif
 
 using namespace impala;
@@ -112,18 +114,19 @@ BufferedTupleStreamV2::~BufferedTupleStreamV2() {
   DCHECK(closed_);
 }
 
-void BufferedTupleStreamV2::CheckConsistency() const {
+void BufferedTupleStreamV2::CheckConsistencyFull() const {
+  CheckConsistencyFast();
+  // The below checks require iterating over all the pages in the stream.
   DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
   DCHECK_EQ(pages_.size(), num_pages_) << DebugString();
-  for (const Page& page : pages_) {
-    DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString();
-    // Only one large row per page.
-    if (page.len() > default_page_len_) DCHECK_LE(page.num_rows, 1);
-    // We only create pages when we have a row to append to them.
-    DCHECK_GT(page.num_rows, 0);
-  }
+  for (const Page& page : pages_) CheckPageConsistency(&page);
+}
+
+void BufferedTupleStreamV2::CheckConsistencyFast() const {
+  // All the below checks should be O(1).
   DCHECK(has_write_iterator() || write_page_ == nullptr);
   if (write_page_ != nullptr) {
+    CheckPageConsistency(write_page_);
     DCHECK(write_page_->is_pinned());
     DCHECK(write_page_->retrieved_buffer);
     const BufferHandle* write_buffer;
@@ -135,6 +138,7 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   }
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
+    CheckPageConsistency(&*read_page_);
     DCHECK(read_page_->is_pinned());
     DCHECK(read_page_->retrieved_buffer);
     // Can't check read buffer without affecting behaviour, because a read may be in
@@ -154,6 +158,14 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   }
 }
 
+void BufferedTupleStreamV2::CheckPageConsistency(const Page* page) const {
+  DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
+  // Only one large row per page.
+  if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
+  // We only create pages when we have a row to append to them.
+  DCHECK_GT(page->num_rows, 0);
+}
+
 string BufferedTupleStreamV2::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
@@ -205,14 +217,14 @@ Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) {
   DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 
   *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
   has_write_iterator_ = true;
   // Save reservation for the write iterators.
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
@@ -223,7 +235,7 @@ Status BufferedTupleStreamV2::PrepareForReadWrite(
   DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 
   *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_);
   if (!*got_reservation) return Status::OK();
@@ -380,7 +392,7 @@ Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t row_size, int64_t* page_
 Status BufferedTupleStreamV2::AdvanceWritePage(
     int64_t row_size, bool* got_reservation) noexcept {
   DCHECK(has_write_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 
   int64_t page_len;
   RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len));
@@ -412,7 +424,7 @@ Status BufferedTupleStreamV2::AdvanceWritePage(
           - write_page_reservation_to_reclaim)) {
     DCHECK(pinned_ || page_len > default_page_len_)
         << "If the stream is unpinned, this should only fail for large pages";
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FAST();
     *got_reservation = false;
     return Status::OK();
   }
@@ -461,7 +473,7 @@ void BufferedTupleStreamV2::InvalidateWriteIterator() {
 Status BufferedTupleStreamV2::NextReadPage() {
   DCHECK(has_read_iterator());
   DCHECK(!closed_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 
   if (read_page_ == pages_.end()) {
     // No rows read yet - start reading at first page. If the stream is unpinned, we can
@@ -488,7 +500,7 @@ Status BufferedTupleStreamV2::NextReadPage() {
   }
 
   if (read_page_ == pages_.end()) {
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FULL();
     return Status::OK();
   }
 
@@ -526,7 +538,7 @@ Status BufferedTupleStreamV2::NextReadPage() {
              write_page_ != nullptr, has_read_write_page())) {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   }
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
   return Status::OK();
 }
 
@@ -551,7 +563,7 @@ void BufferedTupleStreamV2::InvalidateReadIterator() {
 }
 
 Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) {
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
   // If already pinned, no additional pin is needed (see ExpectedPinCount()).
@@ -588,13 +600,13 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
   delete_on_read_ = delete_on_read;
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
 Status BufferedTupleStreamV2::PinStream(bool* pinned) {
   DCHECK(!closed_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   if (pinned_) {
     *pinned = true;
     return Status::OK();
@@ -634,12 +646,12 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
 
   pinned_ = true;
   *pinned = true;
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
 void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
     // Invalidate the iterators so they don't keep pages pinned.
@@ -648,7 +660,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
   }
 
   if (pinned_) {
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FULL();
     // If the stream was pinned, there may be some remaining pinned pages that should
     // be unpinned at this point.
     for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
@@ -662,7 +674,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
     }
     pinned_ = false;
   }
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 }
 
 Status BufferedTupleStreamV2::GetRows(
@@ -905,7 +917,7 @@ void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   }
   // The stream should be in a consistent state once the row is added.
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 }
 
 bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
index 26f2113..2023124 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -692,8 +692,13 @@ class BufferedTupleStreamV2 {
   int64_t CalcBytesPinned() const;
 
   /// DCHECKs if the stream is internally inconsistent. The stream should always be in
-  /// a consistent state after returning success from a public API call.
-  void CheckConsistency() const;
+  /// a consistent state after returning success from a public API call. The Fast version
+  /// has constant runtime and does not check all of 'pages_'. The Full version includes
+  /// O(n) checks that require iterating over the whole 'pages_' list (e.g. checking that
+  /// each page is in a valid state).
+  void CheckConsistencyFast() const;
+  void CheckConsistencyFull() const;
+  void CheckPageConsistency(const Page* page) const;
 };
 }