You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/27 22:26:27 UTC

[impala] 03/04: IMPALA-8890: Advance read page in UnpinStream

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eea617be8f3ffa72acdb4fb857803adb5ec1579a
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Aug 26 11:55:05 2019 -0700

    IMPALA-8890: Advance read page in UnpinStream
    
    Fixes a DCHECK in BufferedTupleStream::UnpinStream where
    ExpectedPinCount would fail if the stream still referenced a read page
    that had be completely read. The patch changes UnpinStream so that if
    the current read page has been completely read, the read page is
    advanced using NextReadPage().
    
    Testing:
    * Ran core tests
    * Added new tests to buffered-tuple-stream-test
    
    Change-Id: Id61a82559fd2baab237cc44f60a69cba2b30c16e
    Reviewed-on: http://gerrit.cloudera.org:8080/14144
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/analytic-eval-node.cc            |  3 +-
 be/src/exec/grouping-aggregator-partition.cc |  5 +-
 be/src/exec/grouping-aggregator.cc           | 16 +++---
 be/src/exec/grouping-aggregator.h            |  2 +-
 be/src/exec/partitioned-hash-join-builder.cc |  5 +-
 be/src/exec/partitioned-hash-join-node.cc    | 19 ++++---
 be/src/runtime/buffered-tuple-stream-test.cc | 78 +++++++++++++++++++++++++---
 be/src/runtime/buffered-tuple-stream.cc      | 10 +++-
 be/src/runtime/buffered-tuple-stream.h       |  2 +-
 be/src/runtime/spillable-row-batch-queue.cc  |  3 +-
 10 files changed, 114 insertions(+), 29 deletions(-)

diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 73b5346..b62f663 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -344,7 +344,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
     RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
-    input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(
+        input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index a968ef0..03f54d8 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -185,9 +185,10 @@ Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) {
   DCHECK(aggregated_row_stream->has_write_iterator());
   DCHECK(!unaggregated_row_stream->has_write_iterator());
   if (more_aggregate_rows) {
-    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   } else {
-    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     bool got_buffer;
     RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
     DCHECK(got_buffer) << "Accounted in min reservation"
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 358919a..5238c43 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -755,7 +755,7 @@ Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) {
   hash_partitions_.clear();
 
   if (dst_partition->is_spilled()) {
-    PushSpilledPartition(dst_partition);
+    RETURN_IF_ERROR(PushSpilledPartition(dst_partition));
     *built_partition = nullptr;
     // Spilled the partition - we should not be using any reservation except from
     // 'serialize_stream_'.
@@ -790,7 +790,8 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
     if (!hash_partition->is_spilled()) continue;
     // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
     // reservation and use it to pin the unaggregated write buffer.
-    hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(hash_partition->aggregated_row_stream->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL));
     bool got_buffer;
     RETURN_IF_ERROR(
         hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -907,7 +908,7 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
     if (total_rows == 0) {
       partition->Close(false);
     } else if (partition->is_spilled()) {
-      PushSpilledPartition(partition);
+      RETURN_IF_ERROR(PushSpilledPartition(partition));
     } else {
       aggregated_partitions_.push_back(partition);
     }
@@ -917,15 +918,18 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
   return Status::OK();
 }
 
-void GroupingAggregator::PushSpilledPartition(Partition* partition) {
+Status GroupingAggregator::PushSpilledPartition(Partition* partition) {
   DCHECK(partition->is_spilled());
   DCHECK(partition->hash_tbl == nullptr);
   // Ensure all pages in the spilled partition's streams are unpinned by invalidating
   // the streams' read and write iterators. We may need all the memory to process the
   // next spilled partitions.
-  partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
-  partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+  RETURN_IF_ERROR(
+      partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+  RETURN_IF_ERROR(
+      partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
   spilled_partitions_.push_front(partition);
+  return Status::OK();
 }
 
 void GroupingAggregator::ClosePartitions() {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 258ed6e..dea7e3f 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -586,7 +586,7 @@ class GroupingAggregator : public Aggregator {
   /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
   /// first). This allows us to delete pages earlier and bottom out the recursion
   /// earlier and also improves time locality of access to spilled data on disk.
-  void PushSpilledPartition(Partition* partition);
+  Status PushSpilledPartition(Partition* partition) WARN_UNUSED_RESULT;
 
   /// Calls Close() on 'output_partition_' and every Partition in
   /// 'aggregated_partitions_', 'spilled_partitions_', and 'hash_partitions_' and then
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 83fb432..175d7d8 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -361,7 +361,8 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
       partition->Close(NULL);
     } else if (partition->is_spilled()) {
       // We don't need any build-side data for spilled partitions in memory.
-      partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     }
   }
 
@@ -628,7 +629,7 @@ Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
     hash_tbl_->Close();
     hash_tbl_.reset();
   }
-  build_rows_->UnpinStream(mode);
+  RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
   if (!is_spilled_) {
     COUNTER_ADD(parent_->num_spilled_partitions_, 1);
     if (parent_->num_spilled_partitions_->value() == 1) {
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 99931b4..42385d0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -421,7 +421,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     // Spill to free memory from hash tables and pinned streams for use in new partitions.
     RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
     // Temporarily free up the probe buffer to use when repartitioning.
-    input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(
+        input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
     DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -992,9 +993,10 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
       && (have_spilled_hash_partitions
              || builder_->null_aware_partition()->is_spilled())) {
-    null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
-    null_aware_probe_partition_->probe_rows()->UnpinStream(
-        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(
+        null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    RETURN_IF_ERROR(null_aware_probe_partition_->probe_rows()->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   }
 
   // Initialize the hash_tbl_ caching array.
@@ -1034,7 +1036,8 @@ bool PartitionedHashJoinNode::AppendProbeRowSlow(
   if (!status->ok()) return false; // Check if AddRow() set status.
   *status = runtime_state_->StartSpilling(mem_tracker());
   if (!status->ok()) return false;
-  stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  *status = stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  if (!status->ok()) return false;
   return stream->AddRow(row, status);
 }
 
@@ -1119,9 +1122,11 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
       // can recurse the algorithm and create new hash partitions from spilled partitions.
       // TODO: we shouldn't need to unpin the build stream if we stop spilling
       // while probing.
-      build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
       DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
-      probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
 
       if (probe_partition->probe_rows()->num_rows() != 0
           || NeedToProcessUnmatchedBuildRows()) {
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 85cceb0..6b097dd 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -326,7 +326,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_TRUE(got_write_reservation);
 
     if (unpin_stream) {
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     }
     // Add rows to the stream
     int offset = 0;
@@ -369,7 +369,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
     ASSERT_TRUE(got_reservation);
     if (unpin_stream) {
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     }
 
     vector<int> results;
@@ -640,7 +640,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     // Make sure we can switch between pinned and unpinned states while writing.
     if (num_batches % 10 == 0) {
       bool pinned;
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
       ASSERT_OK(stream.PinStream(&pinned));
       DCHECK(pinned);
     }
@@ -658,7 +658,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     ++num_batches;
   }
 
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   bool pinned = false;
   ASSERT_OK(stream.PinStream(&pinned));
@@ -1116,7 +1116,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   bool success = stream.AddRow(batch->GetRow(0), &status);
   ASSERT_FALSE(success);
   ASSERT_OK(status);
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   success = stream.AddRow(batch->GetRow(0), &status);
   ASSERT_TRUE(success);
   // Read all the rows back and verify.
@@ -1187,7 +1187,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   bool success = stream.AddRow(write_row, &status);
   ASSERT_FALSE(success);
   ASSERT_OK(status);
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   success = stream.AddRow(write_row, &status);
   ASSERT_TRUE(success);
 
@@ -1210,6 +1210,72 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+// Test that UnpinStream advances the read page if all rows from the read page are
+// attached to a returned RowBatch.
+TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
+  int num_rows = 1024;
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  bool eos;
+  bool got_reservation;
+  Status status;
+  RowBatch* write_batch = CreateIntBatch(0, num_rows, false);
+
+  {
+    // Test unpinning a stream when the read page has been attached to the output batch.
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+
+    // Add rows to stream.
+    for (int i = 0; i < write_batch->num_rows(); ++i) {
+      EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+      ASSERT_OK(status);
+    }
+
+    // Read until the read page is attached to the output.
+    RowBatch read_batch(int_desc_, num_rows, &tracker_);
+    ASSERT_OK(stream.GetNext(&read_batch, &eos));
+    // If GetNext did hit the capacity of the RowBatch, then the read page should have
+    // been attached to read_batch.
+    ASSERT_TRUE(read_batch.num_rows() < num_rows);
+    ASSERT_TRUE(!eos);
+    read_batch.Reset();
+
+    // Unpin the stream.
+    ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  {
+    // Test unpinning an empty stream (all rows have been attached to RowBatches).
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+
+    for (int i = 0; i < write_batch->num_rows(); ++i) {
+      EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+      ASSERT_OK(status);
+    }
+
+    // Read and validate all contents of the stream.
+    vector<int> results;
+    ReadValues(&stream, int_desc_, &results);
+    VerifyResults<int>(*int_desc_, results, num_rows, false);
+
+    // Unpin and close the stream.
+    ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  write_batch->Reset();
+}
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleNullStreamTest, Basic) {
   Init(BUFFER_POOL_LIMIT);
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 87c07eb..a37c443 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -210,7 +210,7 @@ string BufferedTupleStream::Page::DebugString() const {
 }
 
 Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
-  if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+  if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
   caller_label_ = caller_label;
   return Status::OK();
 }
@@ -659,7 +659,7 @@ Status BufferedTupleStream::PinStream(bool* pinned) {
   return Status::OK();
 }
 
-void BufferedTupleStream::UnpinStream(UnpinMode mode) {
+Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
   CHECK_CONSISTENCY_FULL();
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
@@ -670,6 +670,11 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
 
   if (pinned_) {
     CHECK_CONSISTENCY_FULL();
+    if (&*read_page_ != write_page_ && read_page_ != pages_.end()
+        && read_page_rows_returned_ == read_page_->num_rows) {
+      RETURN_IF_ERROR(NextReadPage());
+    }
+
     // If the stream was pinned, there may be some remaining pinned pages that should
     // be unpinned at this point.
     for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
@@ -684,6 +689,7 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
     pinned_ = false;
   }
   CHECK_CONSISTENCY_FULL();
+  return Status::OK();
 }
 
 Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index b0cb8db..e98a9fe 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -316,7 +316,7 @@ class BufferedTupleStream {
   };
 
   /// Unpins stream with the given 'mode' as described above.
-  void UnpinStream(UnpinMode mode);
+  Status UnpinStream(UnpinMode mode) WARN_UNUSED_RESULT;
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
   ///
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
index a9be9f7..da21660 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -84,7 +84,8 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
       DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);
 
       // Unpin the stream and then add the row.
-      batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      RETURN_IF_ERROR(
+          batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
       // Append "Spilled" to the "ExecOption" info string in the runtime profile.
       profile_->AppendExecOption("Spilled");