You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/27 22:26:27 UTC
[impala] 03/04: IMPALA-8890: Advance read page in UnpinStream
This is an automated email from the ASF dual-hosted git repository.
tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit eea617be8f3ffa72acdb4fb857803adb5ec1579a
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Aug 26 11:55:05 2019 -0700
IMPALA-8890: Advance read page in UnpinStream
Fixes a DCHECK in BufferedTupleStream::UnpinStream where
ExpectedPinCount would fail if the stream still referenced a read page
that had be completely read. The patch changes UnpinStream so that if
the current read page has been completely read, the read page is
advanced using NextReadPage().
Testing:
* Ran core tests
* Added new tests to buffered-tuple-stream-test
Change-Id: Id61a82559fd2baab237cc44f60a69cba2b30c16e
Reviewed-on: http://gerrit.cloudera.org:8080/14144
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/analytic-eval-node.cc | 3 +-
be/src/exec/grouping-aggregator-partition.cc | 5 +-
be/src/exec/grouping-aggregator.cc | 16 +++---
be/src/exec/grouping-aggregator.h | 2 +-
be/src/exec/partitioned-hash-join-builder.cc | 5 +-
be/src/exec/partitioned-hash-join-node.cc | 19 ++++---
be/src/runtime/buffered-tuple-stream-test.cc | 78 +++++++++++++++++++++++++---
be/src/runtime/buffered-tuple-stream.cc | 10 +++-
be/src/runtime/buffered-tuple-stream.h | 2 +-
be/src/runtime/spillable-row-batch-queue.cc | 3 +-
10 files changed, 114 insertions(+), 29 deletions(-)
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 73b5346..b62f663 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -344,7 +344,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
// TODO: Consider re-pinning later if the output stream is fully consumed.
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
- input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ RETURN_IF_ERROR(
+ input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
if (!input_stream_->AddRow(row, &status)) {
// Rows should be added in unpinned mode unless an error occurs.
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index a968ef0..03f54d8 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -185,9 +185,10 @@ Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) {
DCHECK(aggregated_row_stream->has_write_iterator());
DCHECK(!unaggregated_row_stream->has_write_iterator());
if (more_aggregate_rows) {
- aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(
+ BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
} else {
- aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
bool got_buffer;
RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
DCHECK(got_buffer) << "Accounted in min reservation"
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 358919a..5238c43 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -755,7 +755,7 @@ Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) {
hash_partitions_.clear();
if (dst_partition->is_spilled()) {
- PushSpilledPartition(dst_partition);
+ RETURN_IF_ERROR(PushSpilledPartition(dst_partition));
*built_partition = nullptr;
// Spilled the partition - we should not be using any reservation except from
// 'serialize_stream_'.
@@ -790,7 +790,8 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
if (!hash_partition->is_spilled()) continue;
// The aggregated rows have been repartitioned. Free up at least a buffer's worth of
// reservation and use it to pin the unaggregated write buffer.
- hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(hash_partition->aggregated_row_stream->UnpinStream(
+ BufferedTupleStream::UNPIN_ALL));
bool got_buffer;
RETURN_IF_ERROR(
hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -907,7 +908,7 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
if (total_rows == 0) {
partition->Close(false);
} else if (partition->is_spilled()) {
- PushSpilledPartition(partition);
+ RETURN_IF_ERROR(PushSpilledPartition(partition));
} else {
aggregated_partitions_.push_back(partition);
}
@@ -917,15 +918,18 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
return Status::OK();
}
-void GroupingAggregator::PushSpilledPartition(Partition* partition) {
+Status GroupingAggregator::PushSpilledPartition(Partition* partition) {
DCHECK(partition->is_spilled());
DCHECK(partition->hash_tbl == nullptr);
// Ensure all pages in the spilled partition's streams are unpinned by invalidating
// the streams' read and write iterators. We may need all the memory to process the
// next spilled partitions.
- partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
- partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(
+ partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+ RETURN_IF_ERROR(
+ partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
spilled_partitions_.push_front(partition);
+ return Status::OK();
}
void GroupingAggregator::ClosePartitions() {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 258ed6e..dea7e3f 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -586,7 +586,7 @@ class GroupingAggregator : public Aggregator {
/// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
/// first). This allows us to delete pages earlier and bottom out the recursion
/// earlier and also improves time locality of access to spilled data on disk.
- void PushSpilledPartition(Partition* partition);
+ Status PushSpilledPartition(Partition* partition) WARN_UNUSED_RESULT;
/// Calls Close() on 'output_partition_' and every Partition in
/// 'aggregated_partitions_', 'spilled_partitions_', and 'hash_partitions_' and then
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 83fb432..175d7d8 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -361,7 +361,8 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
partition->Close(NULL);
} else if (partition->is_spilled()) {
// We don't need any build-side data for spilled partitions in memory.
- partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(
+ partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
}
}
@@ -628,7 +629,7 @@ Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
hash_tbl_->Close();
hash_tbl_.reset();
}
- build_rows_->UnpinStream(mode);
+ RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
if (!is_spilled_) {
COUNTER_ADD(parent_->num_spilled_partitions_, 1);
if (parent_->num_spilled_partitions_->value() == 1) {
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 99931b4..42385d0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -421,7 +421,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
// Spill to free memory from hash tables and pinned streams for use in new partitions.
RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
// Temporarily free up the probe buffer to use when repartitioning.
- input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(
+ input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -992,9 +993,10 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
&& (have_spilled_hash_partitions
|| builder_->null_aware_partition()->is_spilled())) {
- null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
- null_aware_probe_partition_->probe_rows()->UnpinStream(
- BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ RETURN_IF_ERROR(
+ null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+ RETURN_IF_ERROR(null_aware_probe_partition_->probe_rows()->UnpinStream(
+ BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
}
// Initialize the hash_tbl_ caching array.
@@ -1034,7 +1036,8 @@ bool PartitionedHashJoinNode::AppendProbeRowSlow(
if (!status->ok()) return false; // Check if AddRow() set status.
*status = runtime_state_->StartSpilling(mem_tracker());
if (!status->ok()) return false;
- stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ *status = stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ if (!status->ok()) return false;
return stream->AddRow(row, status);
}
@@ -1119,9 +1122,11 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
// can recurse the algorithm and create new hash partitions from spilled partitions.
// TODO: we shouldn't need to unpin the build stream if we stop spilling
// while probing.
- build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(
+ build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
- probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ RETURN_IF_ERROR(
+ probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
if (probe_partition->probe_rows()->num_rows() != 0
|| NeedToProcessUnmatchedBuildRows()) {
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 85cceb0..6b097dd 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -326,7 +326,7 @@ class SimpleTupleStreamTest : public testing::Test {
ASSERT_TRUE(got_write_reservation);
if (unpin_stream) {
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
}
// Add rows to the stream
int offset = 0;
@@ -369,7 +369,7 @@ class SimpleTupleStreamTest : public testing::Test {
ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
ASSERT_TRUE(got_reservation);
if (unpin_stream) {
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
}
vector<int> results;
@@ -640,7 +640,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
// Make sure we can switch between pinned and unpinned states while writing.
if (num_batches % 10 == 0) {
bool pinned;
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
ASSERT_OK(stream.PinStream(&pinned));
DCHECK(pinned);
}
@@ -658,7 +658,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
++num_batches;
}
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
bool pinned = false;
ASSERT_OK(stream.PinStream(&pinned));
@@ -1116,7 +1116,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
bool success = stream.AddRow(batch->GetRow(0), &status);
ASSERT_FALSE(success);
ASSERT_OK(status);
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
success = stream.AddRow(batch->GetRow(0), &status);
ASSERT_TRUE(success);
// Read all the rows back and verify.
@@ -1187,7 +1187,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
bool success = stream.AddRow(write_row, &status);
ASSERT_FALSE(success);
ASSERT_OK(status);
- stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
success = stream.AddRow(write_row, &status);
ASSERT_TRUE(success);
@@ -1210,6 +1210,72 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
+// Test that UnpinStream advances the read page if all rows from the read page are
+// attached to a returned RowBatch.
+TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
+ int num_rows = 1024;
+ int buffer_size = 4 * 1024;
+ Init(100 * buffer_size);
+
+ bool eos;
+ bool got_reservation;
+ Status status;
+ RowBatch* write_batch = CreateIntBatch(0, num_rows, false);
+
+ {
+ // Test unpinning a stream when the read page has been attached to the output batch.
+ BufferedTupleStream stream(
+ runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+ ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+ ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+ ASSERT_TRUE(got_reservation);
+
+ // Add rows to stream.
+ for (int i = 0; i < write_batch->num_rows(); ++i) {
+ EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+ ASSERT_OK(status);
+ }
+
+ // Read until the read page is attached to the output.
+ RowBatch read_batch(int_desc_, num_rows, &tracker_);
+ ASSERT_OK(stream.GetNext(&read_batch, &eos));
+ // If GetNext did hit the capacity of the RowBatch, then the read page should have
+ // been attached to read_batch.
+ ASSERT_TRUE(read_batch.num_rows() < num_rows);
+ ASSERT_TRUE(!eos);
+ read_batch.Reset();
+
+ // Unpin the stream.
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+ stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ }
+
+ {
+ // Test unpinning an empty stream (all rows have been attached to RowBatches).
+ BufferedTupleStream stream(
+ runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+ ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+ ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+ ASSERT_TRUE(got_reservation);
+
+ for (int i = 0; i < write_batch->num_rows(); ++i) {
+ EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+ ASSERT_OK(status);
+ }
+
+ // Read and validate all contents of the stream.
+ vector<int> results;
+ ReadValues(&stream, int_desc_, &results);
+ VerifyResults<int>(*int_desc_, results, num_rows, false);
+
+ // Unpin and close the stream.
+ ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+ stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ }
+
+ write_batch->Reset();
+}
+
// Basic API test. No data should be going to disk.
TEST_F(SimpleNullStreamTest, Basic) {
Init(BUFFER_POOL_LIMIT);
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 87c07eb..a37c443 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -210,7 +210,7 @@ string BufferedTupleStream::Page::DebugString() const {
}
Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
- if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+ if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
caller_label_ = caller_label;
return Status::OK();
}
@@ -659,7 +659,7 @@ Status BufferedTupleStream::PinStream(bool* pinned) {
return Status::OK();
}
-void BufferedTupleStream::UnpinStream(UnpinMode mode) {
+Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
CHECK_CONSISTENCY_FULL();
DCHECK(!closed_);
if (mode == UNPIN_ALL) {
@@ -670,6 +670,11 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
if (pinned_) {
CHECK_CONSISTENCY_FULL();
+ if (&*read_page_ != write_page_ && read_page_ != pages_.end()
+ && read_page_rows_returned_ == read_page_->num_rows) {
+ RETURN_IF_ERROR(NextReadPage());
+ }
+
// If the stream was pinned, there may be some remaining pinned pages that should
// be unpinned at this point.
for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
@@ -684,6 +689,7 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
pinned_ = false;
}
CHECK_CONSISTENCY_FULL();
+ return Status::OK();
}
Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index b0cb8db..e98a9fe 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -316,7 +316,7 @@ class BufferedTupleStream {
};
/// Unpins stream with the given 'mode' as described above.
- void UnpinStream(UnpinMode mode);
+ Status UnpinStream(UnpinMode mode) WARN_UNUSED_RESULT;
/// Get the next batch of output rows, which are backed by the stream's memory.
///
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
index a9be9f7..da21660 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -84,7 +84,8 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);
// Unpin the stream and then add the row.
- batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+ RETURN_IF_ERROR(
+ batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
// Append "Spilled" to the "ExecOption" info string in the runtime profile.
profile_->AppendExecOption("Spilled");