You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/07 21:32:14 UTC
[2/2] impala git commit: Revert "IMPALA-7333: remove
MarkNeedsDeepCopy() in agg and BTS"
Revert "IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS"
This reverts commit 240fde62d532c7166fc613a97b38c199cec09f1f.
The commit caused a regression: IMPALA-7403
Change-Id: Idbdae8a7b87e19ce30c83a817d940c0a689262a4
Reviewed-on: http://gerrit.cloudera.org:8080/11145
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d33d5b74
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d33d5b74
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d33d5b74
Branch: refs/heads/master
Commit: d33d5b74c24b452c23f705d51768a86e90d25772
Parents: 22f167f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 7 10:25:20 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Aug 7 21:18:15 2018 +0000
----------------------------------------------------------------------
be/src/exec/grouping-aggregator.cc | 8 +-
be/src/runtime/buffered-tuple-stream-test.cc | 249 ++--------------------
be/src/runtime/buffered-tuple-stream.cc | 145 +++++--------
be/src/runtime/buffered-tuple-stream.h | 138 +++++-------
be/src/runtime/row-batch.h | 1 -
be/src/runtime/tuple.h | 4 -
6 files changed, 137 insertions(+), 408 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 42d6be8..092ecfd 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -294,12 +294,8 @@ Status GroupingAggregator::GetRowsFromPartition(
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
partition_eos_ = ReachedLimit();
- if (partition_eos_ || output_iterator_.AtEnd()) {
- // Attach all buffers referenced by previously-returned rows. On the next GetNext()
- // call we will close the partition.
- output_partition_->aggregated_row_stream->Close(
- row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
- }
+ if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
+
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 6ff9805..ef66824 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -398,22 +398,6 @@ class SimpleTupleStreamTest : public testing::Test {
void TestTransferMemory(bool pinned_stream, bool read_write);
- void TestAttachMemory(bool pinned_stream, bool attach_on_read);
-
- void TestFlushResourcesReadWrite(bool pinned_stream, bool attach_on_read);
-
- /// Helper for TestFlushResourcesReadWrite() to write and read back rows from
- /// *stream. 'append_batch_size' is the number of rows to append at a time before
- /// reading them back. *num_buffers_attached tracks the number of buffers attached
- /// to the output batch.
- void AppendToReadWriteStream(int64_t append_batch_size, int64_t buffer_size,
- int* num_buffers_attached, BufferedTupleStream* stream);
-
- // Helper for AppendToReadWriteStream() to verify 'out_batch' contents. The value of
- // row i of 'out_batch' is expected to be the same as the row at index
- // (i + start_index) % out_batch->num_rows() of 'in_batch'.
- void VerifyReadWriteBatch(RowBatch* in_batch, RowBatch* out_batch, int64_t start_index);
-
// Helper to writes 'row' comprised of only string slots to 'data'. The expected
// length of the data written is 'expected_len'.
void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
@@ -665,13 +649,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
ASSERT_TRUE(pinned);
// Read and verify result a few times. We should be able to reread the stream if
- // we don't use attach on read mode.
+ // we don't use delete on read mode.
int read_iters = 3;
for (int i = 0; i < read_iters; ++i) {
- bool attach_on_read = i == read_iters - 1;
+ bool delete_on_read = i == read_iters - 1;
if (i > 0 || !read_write) {
bool got_read_reservation;
- ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+ ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
ASSERT_TRUE(got_read_reservation);
}
@@ -686,13 +670,15 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
}
}
- // After attach_on_read, all buffers should have been attached to the output batches
- // on previous GetNext() calls.
- ASSERT_EQ(0, stream.BytesPinned(false));
+ // After delete_on_read, all blocks aside from the last should be deleted.
+ // Note: this should really be 0, but the BufferedTupleStream returns eos before
+ // deleting the last block, rather than after, so the last block isn't deleted
+ // until the stream is closed.
+ ASSERT_EQ(stream.BytesPinned(false), buffer_size);
stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- ASSERT_EQ(0, stream.BytesPinned(false));
+ ASSERT_EQ(stream.BytesPinned(false), 0);
}
TEST_F(SimpleTupleStreamTest, UnpinPin) {
@@ -779,205 +765,6 @@ TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
TestTransferMemory(false, false);
}
-/// Test iteration over a stream with and without attaching memory.
-void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_read) {
- // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
- // make the batch at capacity.
- int buffer_size = 4 * 1024;
- Init(100 * buffer_size);
-
- BufferedTupleStream stream(
- runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
- ASSERT_OK(stream.Init(-1, pin_stream));
- bool got_write_reservation;
- ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
- ASSERT_TRUE(got_write_reservation);
- RowBatch* in_batch = CreateIntBatch(0, 1024, false);
-
- // Construct a stream with 4 pages.
- const int total_num_pages = 4;
- while (stream.byte_size() < total_num_pages * buffer_size) {
- Status status;
- for (int i = 0; i < in_batch->num_rows(); ++i) {
- bool ret = stream.AddRow(in_batch->GetRow(i), &status);
- EXPECT_TRUE(ret);
- ASSERT_OK(status);
- }
- }
-
- RowBatch* out_batch = pool_.Add(new RowBatch(int_desc_, 100, &tracker_));
- int num_buffers_attached = 0;
- int num_flushes = 0;
- int64_t num_rows_returned = 0;
- bool got_read_reservation;
- ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
- ASSERT_TRUE(got_read_reservation);
- bool eos;
- do {
- ASSERT_EQ(0, out_batch->num_buffers());
- ASSERT_OK(stream.GetNext(out_batch, &eos));
- EXPECT_LE(out_batch->num_buffers(), 1) << "Should only attach one buffer at a time";
- if (out_batch->num_buffers() > 0) {
- EXPECT_TRUE(out_batch->AtCapacity()) << "Flush resources flag should have been set";
- }
- num_buffers_attached += out_batch->num_buffers();
- for (int i = 0; i < out_batch->num_rows(); ++i) {
- int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
- TupleRow* in_row = in_batch->GetRow(num_rows_returned % in_batch->num_rows());
- EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
- *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
- ++num_rows_returned;
- }
- num_flushes += out_batch->flush_mode() == RowBatch::FlushMode::FLUSH_RESOURCES;
- out_batch->Reset();
- } while (!eos);
-
- if (attach_on_read) {
- EXPECT_EQ(4, num_buffers_attached) << "All buffers attached during iteration.";
- } else {
- EXPECT_EQ(0, num_buffers_attached) << "No buffers attached during iteration.";
- }
- if (attach_on_read || !pin_stream) EXPECT_EQ(4, num_flushes);
- out_batch->Reset();
- stream.Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
- if (attach_on_read) {
- EXPECT_EQ(0, out_batch->num_buffers());
- } else if (pin_stream) {
- // All buffers should be attached.
- EXPECT_EQ(4, out_batch->num_buffers());
- } else {
- // Buffer from last pinned page should be attached.
- EXPECT_EQ(1, out_batch->num_buffers());
- }
- in_batch->Reset();
- out_batch->Reset();
-}
-
-TEST_F(SimpleTupleStreamTest, TestAttachMemoryPinned) {
- TestAttachMemory(true, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryPinned) {
- TestAttachMemory(true, false);
-}
-
-TEST_F(SimpleTupleStreamTest, TestAttachMemoryUnpinned) {
- TestAttachMemory(false, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryUnpinned) {
- TestAttachMemory(false, false);
-}
-
-// Test for advancing the read/write page with resource flushing.
-void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
- bool pin_stream, bool attach_on_read) {
- // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
- // make the batch at capacity.
- const int BUFFER_SIZE = 512;
- const int BATCH_SIZE = 100;
- // For unpinned streams, we should be able to iterate with only two buffers.
- const int MAX_PINNED_PAGES = pin_stream ? 1000 : 2;
- Init(MAX_PINNED_PAGES * BUFFER_SIZE);
-
- BufferedTupleStream stream(
- runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
- ASSERT_OK(stream.Init(-1, pin_stream));
- bool got_reservation;
- ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
- ASSERT_TRUE(got_reservation);
- int num_buffers_attached = 0;
- /// Read over the page in different increments.
- for (int append_batch_size : {1, 10, 100, 1000}) {
- AppendToReadWriteStream(
- append_batch_size, BUFFER_SIZE, &num_buffers_attached, &stream);
- }
-
- if (attach_on_read) {
- EXPECT_EQ(stream.byte_size() / BUFFER_SIZE - 1, num_buffers_attached)
- << "All buffers except the current write page should have been attached";
- } else {
- EXPECT_EQ(0, num_buffers_attached);
- }
-
- RowBatch* final_out_batch = pool_.Add(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
- stream.Close(final_out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
- final_out_batch->Reset();
-}
-
-void SimpleTupleStreamTest::AppendToReadWriteStream(int64_t append_batch_size,
- int64_t buffer_size, int* num_buffers_attached, BufferedTupleStream* stream) {
- RowBatch* in_batch = CreateIntBatch(0, BATCH_SIZE, false);
-
- /// Accumulate row batches until we see a flush. The contents of the batches should
- /// remain valid until reset or delete trailing batches.
- vector<unique_ptr<RowBatch>> out_batches;
- // The start row index of each batch in 'out_batches'.
- vector<int64_t> out_batch_start_indices;
- // Iterate over at least 10 pages.
- int64_t start_byte_size = stream->byte_size();
- while (stream->byte_size() - start_byte_size < 10 * buffer_size) {
- Status status;
- for (int i = 0; i < append_batch_size; ++i) {
- bool ret = stream->AddRow(
- in_batch->GetRow(stream->num_rows() % in_batch->num_rows()), &status);
- EXPECT_TRUE(ret);
- ASSERT_OK(status);
- }
- int64_t rows_read = 0;
- bool eos;
- while (rows_read < append_batch_size) {
- out_batches.emplace_back(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
- out_batch_start_indices.push_back(stream->rows_returned());
- ASSERT_OK(stream->GetNext(out_batches.back().get(), &eos));
- // Verify the contents of all valid batches to make sure that they haven't become
- // invalid.
- LOG(INFO) << "Verifying " << out_batches.size() << " batches";
- for (int i = 0; i < out_batches.size(); ++i) {
- VerifyReadWriteBatch(in_batch, out_batches[i].get(), out_batch_start_indices[i]);
- }
- *num_buffers_attached += out_batches.back()->num_buffers();
- rows_read += out_batches.back()->num_rows();
- EXPECT_EQ(rows_read == append_batch_size, eos);
- if (out_batches.back().get()->flush_mode()
- == RowBatch::FlushMode::FLUSH_RESOURCES) {
- out_batches.clear();
- out_batch_start_indices.clear();
- }
- }
- EXPECT_EQ(append_batch_size, rows_read);
- EXPECT_EQ(true, eos);
- }
- in_batch->Reset();
-}
-
-void SimpleTupleStreamTest::VerifyReadWriteBatch(
- RowBatch* in_batch, RowBatch* out_batch, int64_t start_index) {
- int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
- int64_t row_index = start_index;
- for (int i = 0; i < out_batch->num_rows(); ++i) {
- TupleRow* in_row = in_batch->GetRow(row_index++ % in_batch->num_rows());
- EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
- *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
- }
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedAttach) {
- TestFlushResourcesReadWrite(true, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedNoAttach) {
- TestFlushResourcesReadWrite(true, false);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedAttach) {
- TestFlushResourcesReadWrite(false, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedNoAttach) {
- TestFlushResourcesReadWrite(false, false);
-}
-
// Test that tuple stream functions if it references strings outside stream. The
// aggregation node relies on this since it updates tuples in-place.
TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
@@ -1018,11 +805,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
DCHECK_EQ(rows_added, stream.num_rows());
- for (int attach_on_read = 0; attach_on_read <= 1; ++attach_on_read) {
+ for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
// Keep stream in memory and test we can read ok.
vector<StringValue> results;
bool got_read_reservation;
- ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+ ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
ASSERT_TRUE(got_read_reservation);
ReadValues(&stream, string_desc_, &results);
VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
@@ -1147,8 +934,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
vector<uint8_t> tuple_mem(tuple_desc->byte_size());
Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
write_row->SetTuple(0, write_tuple);
- StringValue* write_str =
- write_tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+ StringValue* write_str = reinterpret_cast<StringValue*>(
+ write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
// Make the string large enough to fill a page.
const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
vector<char> data(string_len);
@@ -1174,7 +961,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
EXPECT_EQ(1, read_batch.num_rows());
EXPECT_TRUE(eos);
Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
- StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+ StringValue* str = reinterpret_cast<StringValue*>(
+ tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
EXPECT_EQ(string_len, str->len);
for (int j = 0; j < string_len; ++j) {
EXPECT_EQ(i, str->ptr[j]) << j;
@@ -1200,7 +988,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
EXPECT_EQ(1, read_batch.num_rows());
EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
- StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+ StringValue* str = reinterpret_cast<StringValue*>(
+ tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
EXPECT_EQ(string_len, str->len);
for (int j = 0; j < string_len; ++j) {
ASSERT_EQ(i, str->ptr[j]) << j;
@@ -1328,10 +1117,10 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
}
for (int i = 0; i < 3; ++i) {
- bool attach_on_read = i == 2;
+ bool delete_on_read = i == 2;
vector<StringValue> results;
bool got_read_reservation;
- ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+ ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
ASSERT_TRUE(got_read_reservation);
ReadValues(&stream, string_desc_, &results);
VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 71175d1..9326507 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,20 +47,36 @@ using namespace impala;
using namespace strings;
using BufferHandle = BufferPool::BufferHandle;
-using FlushMode = RowBatch::FlushMode;
BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client,
int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
: state_(state),
desc_(row_desc),
+ node_id_(-1),
buffer_pool_(state->exec_env()->buffer_pool()),
buffer_pool_client_(buffer_pool_client),
+ num_pages_(0),
+ total_byte_size_(0),
+ has_read_iterator_(false),
read_page_reservation_(buffer_pool_client_),
+ read_page_rows_returned_(-1),
+ read_ptr_(nullptr),
+ read_end_ptr_(nullptr),
+ write_ptr_(nullptr),
+ write_end_ptr_(nullptr),
+ rows_returned_(0),
+ has_write_iterator_(false),
+ write_page_(nullptr),
write_page_reservation_(buffer_pool_client_),
+ bytes_pinned_(0),
+ num_rows_(0),
default_page_len_(default_page_len),
max_page_len_(max_page_len),
- has_nullable_tuple_(row_desc->IsAnyTupleNullable()) {
+ has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
+ delete_on_read_(false),
+ closed_(false),
+ pinned_(true) {
DCHECK_GE(max_page_len, default_page_len);
DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
@@ -94,6 +110,10 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
}
}
+BufferedTupleStream::~BufferedTupleStream() {
+ DCHECK(closed_);
+}
+
void BufferedTupleStream::CheckConsistencyFull() const {
CheckConsistencyFast();
// The below checks require iterating over all the pages in the stream.
@@ -119,13 +139,11 @@ void BufferedTupleStream::CheckConsistencyFast() const {
DCHECK(has_read_iterator() || read_page_ == pages_.end());
if (read_page_ != pages_.end()) {
CheckPageConsistency(&*read_page_);
- if (!read_page_->attached_to_output_batch) {
- DCHECK(read_page_->is_pinned());
- DCHECK(read_page_->retrieved_buffer);
- // Can't check read buffer without affecting behaviour, because a read may be in
- // flight and this would required blocking on that write.
- DCHECK_GE(read_end_ptr_, read_ptr_);
- }
+ DCHECK(read_page_->is_pinned());
+ DCHECK(read_page_->retrieved_buffer);
+ // Can't check read buffer without affecting behaviour, because a read may be in
+ // flight and this would required blocking on that write.
+ DCHECK_GE(read_end_ptr_, read_ptr_);
}
if (NeedReadReservation()) {
DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
@@ -141,12 +159,6 @@ void BufferedTupleStream::CheckConsistencyFast() const {
}
void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
- if (page->attached_to_output_batch) {
- /// Read page was just attached to output batch.
- DCHECK(is_read_page(page)) << page->DebugString();
- DCHECK(!page->handle.is_open());
- return;
- }
DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
// Only one large row per page.
if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
@@ -158,7 +170,7 @@ string BufferedTupleStream::DebugString() const {
stringstream ss;
ss << "BufferedTupleStream num_rows=" << num_rows_
<< " rows_returned=" << rows_returned_ << " pinned=" << pinned_
- << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
+ << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
<< " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
<< " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
<< " read_page=";
@@ -189,23 +201,8 @@ string BufferedTupleStream::DebugString() const {
return ss.str();
}
-void BufferedTupleStream::Page::AttachBufferToBatch(
- BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
- DCHECK(is_pinned());
- DCHECK(retrieved_buffer);
- parent->bytes_pinned_ -= len();
- // ExtractBuffer() cannot fail because the buffer is already in memory.
- BufferPool::BufferHandle buffer;
- Status status =
- parent->buffer_pool_->ExtractBuffer(parent->buffer_pool_client_, &handle, &buffer);
- DCHECK(status.ok());
- batch->AddBuffer(parent->buffer_pool_client_, move(buffer), flush);
- attached_to_output_batch = true;
-}
-
string BufferedTupleStream::Page::DebugString() const {
- return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
- handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
+ return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
}
Status BufferedTupleStream::Init(int node_id, bool pinned) {
@@ -217,7 +214,7 @@ Status BufferedTupleStream::Init(int node_id, bool pinned) {
Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
// This must be the first iterator created.
DCHECK(pages_.empty());
- DCHECK(!attach_on_read_);
+ DCHECK(!delete_on_read_);
DCHECK(!has_write_iterator());
DCHECK(!has_read_iterator());
CHECK_CONSISTENCY_FULL();
@@ -232,10 +229,10 @@ Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
}
Status BufferedTupleStream::PrepareForReadWrite(
- bool attach_on_read, bool* got_reservation) {
+ bool delete_on_read, bool* got_reservation) {
// This must be the first iterator created.
DCHECK(pages_.empty());
- DCHECK(!attach_on_read_);
+ DCHECK(!delete_on_read_);
DCHECK(!has_write_iterator());
DCHECK(!has_read_iterator());
CHECK_CONSISTENCY_FULL();
@@ -246,17 +243,20 @@ Status BufferedTupleStream::PrepareForReadWrite(
// Save reservation for both the read and write iterators.
buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
- RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
+ RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
return Status::OK();
}
-void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
+void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
for (Page& page : pages_) {
- if (page.attached_to_output_batch) continue; // Already returned.
if (batch != nullptr && page.retrieved_buffer) {
// Subtle: We only need to attach buffers from pages that we may have returned
- // references to.
- page.AttachBufferToBatch(this, batch, flush);
+ // references to. ExtractBuffer() cannot fail for these pages because the data
+ // is guaranteed to already be in -memory.
+ BufferPool::BufferHandle buffer;
+ Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+ DCHECK(status.ok());
+ batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
} else {
buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
}
@@ -271,9 +271,7 @@ void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
int64_t BufferedTupleStream::CalcBytesPinned() const {
int64_t result = 0;
- for (const Page& page : pages_) {
- if (!page.attached_to_output_batch) result += page.pin_count() * page.len();
- }
+ for (const Page& page : pages_) result += page.pin_count() * page.len();
return result;
}
@@ -284,7 +282,6 @@ Status BufferedTupleStream::PinPage(Page* page) {
}
int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const {
- DCHECK(!page->attached_to_output_batch);
return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
}
@@ -486,11 +483,12 @@ Status BufferedTupleStream::NextReadPage() {
&& !NeedReadReservation(pinned_, num_pages_, true, true)) {
buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
}
- } else if (attach_on_read_) {
+ } else if (delete_on_read_) {
DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
<< DebugString();
DCHECK_NE(&*read_page_, write_page_);
- DCHECK(read_page_->attached_to_output_batch);
+ bytes_pinned_ -= pages_.front().len();
+ buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
pages_.pop_front();
--num_pages_;
read_page_ = pages_.begin();
@@ -559,13 +557,12 @@ void BufferedTupleStream::InvalidateReadIterator() {
if (read_page_reservation_.GetReservation() > 0) {
buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
}
- // It is safe to re-read an attach-on-read stream if no rows were read and no pages
+ // It is safe to re-read a delete-on-read stream if no rows were read and no pages
// were therefore deleted.
- DCHECK(attach_on_read_ == false || rows_returned_ == 0);
- if (rows_returned_ == 0) attach_on_read_ = false;
+ if (rows_returned_ == 0) delete_on_read_ = false;
}
-Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
+Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) {
CHECK_CONSISTENCY_FULL();
InvalidateWriteIterator();
InvalidateReadIterator();
@@ -573,12 +570,12 @@ Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reserv
*got_reservation = pinned_ || pages_.empty()
|| buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
if (!*got_reservation) return Status::OK();
- return PrepareForReadInternal(attach_on_read);
+ return PrepareForReadInternal(delete_on_read);
}
-Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
DCHECK(!closed_);
- DCHECK(!attach_on_read_);
+ DCHECK(!delete_on_read_);
DCHECK(!has_read_iterator());
has_read_iterator_ = true;
@@ -602,7 +599,7 @@ Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
}
read_page_rows_returned_ = 0;
rows_returned_ = 0;
- attach_on_read_ = attach_on_read;
+ delete_on_read_ = delete_on_read;
CHECK_CONSISTENCY_FULL();
return Status::OK();
}
@@ -711,15 +708,6 @@ Status BufferedTupleStream::GetNextInternal(
if (UNLIKELY(read_page_ == pages_.end()
|| read_page_rows_returned_ == read_page_->num_rows)) {
- if (read_page_ != pages_.end() && attach_on_read_
- && !read_page_->attached_to_output_batch) {
- DCHECK(has_write_iterator());
- // We're in a read-write stream in the case where we're at the end of the read page
- // but the buffer was not attached on the last GetNext() call because the write
- // iterator had not yet advanced.
- read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
- return Status::OK();
- }
// Get the next page in the stream (or the first page if read_page_ was not yet
// initialized.) We need to do this at the beginning of the GetNext() call to ensure
// the buffer management semantics. NextReadPage() may unpin or delete the buffer
@@ -741,7 +729,7 @@ Status BufferedTupleStream::GetNextInternal(
// null tuple indicator.
if (FILL_FLAT_ROWS) {
DCHECK(flat_rows != nullptr);
- DCHECK(!attach_on_read_);
+ DCHECK(!delete_on_read_);
DCHECK_EQ(batch->num_rows(), 0);
flat_rows->clear();
flat_rows->reserve(rows_to_fill);
@@ -780,28 +768,11 @@ Status BufferedTupleStream::GetNextInternal(
rows_returned_ += rows_to_fill;
read_page_rows_returned_ += rows_to_fill;
*eos = (rows_returned_ == num_rows_);
- if (read_page_rows_returned_ == read_page_->num_rows) {
- // No more data in this page. NextReadPage() may need to reuse the reservation
- // currently used for 'read_page_' so we may need to flush resources. When
- // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
- // be unpinned later but we're returning a reference to the memory so we need to
- // signal to the caller that the resources are going away. Note that if there is a
- // read-write page it is not safe to attach the buffer yet because more rows may be
- // appended to the page.
- if (attach_on_read_) {
- if (!has_read_write_page()) {
- // Safe to attach because we already called GetBuffer() in NextReadPage().
- // TODO: always flushing for pinned streams is overkill since we may not need
- // to reuse the reservation immediately. Changing this may require modifying
- // callers of this class.
- read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
- }
- } else if (!pinned_) {
- // Flush resources so that we can safely unpin the page on the next GetNext() call.
- // Note that if this is a read/write page we might not actually do the advance on
- // the next call to GetNext(). In that case the flush is still safe to do.
- batch->MarkFlushResources();
- }
+ if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
+ // No more data in this page. The batch must be immediately returned up the operator
+ // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
+ // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
+ batch->MarkNeedsDeepCopy();
}
if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
DCHECK_LE(read_ptr_, read_end_ptr_);
@@ -1057,7 +1028,7 @@ void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const
DCHECK(row != nullptr);
DCHECK(!closed_);
DCHECK(is_pinned());
- DCHECK(!attach_on_read_);
+ DCHECK(!delete_on_read_);
uint8_t* data = flat_row;
return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
UnflattenTupleRow<false>(&data, row);
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 4090ea8..565b5fa 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -93,9 +93,12 @@ class TupleRow;
/// buffer is needed to keep the row being processed in-memory, but only default-sized
/// buffers are needed for the other streams being written.
///
-/// The tuple stream also supports a 'attach_on_read' mode, enabled by passing a flag
-/// to PrepareForRead() which attaches the stream's pages to the output batch as it
-/// does a final destructive read pass over the stream.
+/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which deletes the stream's pages as it does a final read
+/// pass over the stream.
+///
+/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
+/// buffers to RowBatches.
///
/// Page layout:
/// Rows are stored back to back starting at the first byte of each page's buffer, with
@@ -159,11 +162,11 @@ class TupleRow;
/// Read:
/// 1. Unpinned: Only a single read page is pinned at a time. This means that only
/// enough reservation to pin a single page is needed to read the stream, regardless
-/// of the stream's size. Each page is attached or unpinned (if attach on read is true
+/// of the stream's size. Each page is deleted or unpinned (if delete on read is true
/// or false respectively) before advancing to the next page.
/// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or
-/// unpinned when reading from the stream. If attach on read is true, pages are
-/// attached after being read. If the stream was previously unpinned, the page's data
+/// unpinned when reading from the stream. If delete on read is true, pages are
+/// deleted after being read. If the stream was previously unpinned, the page's data
/// may not yet be in memory - reading from the stream can block on I/O or fail with
/// an I/O error.
/// Write:
@@ -176,17 +179,12 @@ class TupleRow;
/// or free up other memory before retrying.
///
/// Memory lifetime of rows read from stream:
-/// There are several cases.
-/// 1. If the stream is pinned and attach on read is false, it is valid to access any
-/// tuples returned via GetNext() until the stream is unpinned.
-/// 2. If the stream is in attach on read mode, all buffers referenced by returned rows
-/// are attached to the batches by that GetNext() call or a subsequent one. The
-/// caller is responsible for managing the lifetime of those buffers.
-/// 3. If the stream is unpinned and not in attach on read mode, then the batch returned
-/// from GetNext() may have the FLUSH_RESOURCES flag set, which means that any tuple
-/// memory returned so far from the stream may be freed on the next call to GetNext().
-/// It is *not* safe to return references to rows returned in this mode outside of
-/// the ExecNode.
+/// If the stream is pinned and delete on read is false, it is valid to access any tuples
+/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or
+/// delete on read is true, then the batch returned from GetNext() may have the
+/// needs_deep_copy flag set, which means that any tuple memory returned so far from the
+/// stream may be freed on the next call to GetNext().
+/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
///
/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
/// The BufferedTupleStream supports allocation of uninitialized rows with
@@ -199,7 +197,8 @@ class TupleRow;
/// will not be modified until the stream is read via GetNext().
/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
///
-/// TODO: prefetching for pages could speed up iteration over unpinned streams.
+/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
+/// page will need to be pinned soon.
class BufferedTupleStream {
public:
/// A pointer to the start of a flattened TupleRow in the stream.
@@ -214,7 +213,7 @@ class BufferedTupleStream {
int64_t max_page_len,
const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
- ~BufferedTupleStream() { DCHECK(closed_); }
+ virtual ~BufferedTupleStream();
/// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
/// once before any of the other APIs.
@@ -234,23 +233,23 @@ class BufferedTupleStream {
/// Prepares the stream for interleaved reads and writes by saving enough reservation
/// for default-sized read and write pages. Called after Init() and before the first
/// AddRow() or AddRowCustomBegin() call.
- /// 'attach_on_read': Pages are attached to the output batch after they are read.
+ /// 'delete_on_read': Pages are deleted after they are read.
/// 'got_reservation': set to true if there was enough reservation to initialize the
/// read and write pages and false if there was not enough reservation and no other
/// error was encountered. Undefined if an error status is returned.
Status PrepareForReadWrite(
- bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+ bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
/// Prepares the stream for reading, invalidating the write iterator (if there is one).
/// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
/// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
/// over the stream, unless rows were read from the stream after PrepareForRead() or
- /// PrepareForReadWrite() was called with attach_on_read = true.
- /// 'attach_on_read': Pages are attached to the output batch after they are read.
+ /// PrepareForReadWrite() was called with delete_on_read = true.
+ /// 'delete_on_read': Pages are deleted after they are read.
/// 'got_reservation': set to true if there was enough reservation to initialize the
/// first read page and false if there was not enough reservation and no other
/// error was encountered. Undefined if an error status is returned.
- Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+ Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
/// Adds a single row to the stream. There are three possible outcomes:
/// a) The append succeeds. True is returned.
@@ -304,10 +303,7 @@ class BufferedTupleStream {
enum UnpinMode {
/// All pages in the stream are unpinned and the read/write positions in the stream
/// are reset. No more rows can be written to the stream after this. The stream can
- /// be re-read from the beginning by calling PrepareForRead(). It in invalid to call
- /// UnpinStream(UNPIN_ALL) if the stream is in 'attach_on_read' mode and >= 1 row has
- /// been read from the stream, because this would leave the stream in limbo where it
- /// still has unpinned pages but it cannot be read or written to.
+ /// be re-read from the beginning by calling PrepareForRead().
UNPIN_ALL,
/// All pages are unpinned aside from the current read and write pages (if any),
/// which is left in the same state. The unpinned stream can continue being read
@@ -319,21 +315,14 @@ class BufferedTupleStream {
void UnpinStream(UnpinMode mode);
/// Get the next batch of output rows, which are backed by the stream's memory.
+ /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
+ /// flag may be set on 'batch' to signal that memory will be freed on the next
+ /// call to GetNext() and that the caller should copy out any data it needs from
+ /// rows in 'batch' or in previous batches returned from GetNext().
///
- /// If the stream is in 'attach_on_read' mode then buffers are attached to 'batch'
- /// when the last row referencing the buffer is returned. The FLUSH_RESOURCES flag
- /// is always set when attaching such a buffer.
- /// TODO: always flushing for pinned streams is overkill since we may not need
- /// to reuse the reservation immediately. Changing this may require modifying
- /// callers of this class.
- ///
- /// If the stream is unpinned and not in 'attach_on_read' mode, the FLUSH_RESOURCES
- /// flag may be set on the batch to signal that memory will be freed on the next call
- /// to GetNext() and that the caller should copy out any data it needs from rows in
- /// 'batch' or in previous batches returned from GetNext().
- ///
- /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
+ /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
/// rows will remain valid until the stream is unpinned, destroyed, etc.
+ /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
/// Same as above, but populate 'flat_rows' with a pointer to the flat version of
@@ -381,6 +370,8 @@ class BufferedTupleStream {
/// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
struct Page {
+ Page() : num_rows(0), retrieved_buffer(true) {}
+
inline int len() const { return handle.len(); }
inline bool is_pinned() const { return handle.is_pinned(); }
inline int pin_count() const { return handle.pin_count(); }
@@ -389,27 +380,17 @@ class BufferedTupleStream {
retrieved_buffer = true;
return Status::OK();
}
-
- /// Attach the buffer from this page to 'batch'. Only valid to call if the page is
- /// pinned and 'retrieved_buffer' is true. Decrements parent->bytes_pinned_.
- void AttachBufferToBatch(
- BufferedTupleStream* parent, RowBatch* batch, RowBatch::FlushMode flush);
-
std::string DebugString() const;
BufferPool::PageHandle handle;
/// Number of rows written to the page.
- int num_rows = 0;
+ int num_rows;
/// Whether we called GetBuffer() on the page since it was last pinned. This means
/// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
/// returned rows referencing the page's buffer.
- bool retrieved_buffer = true;
-
- /// If the page was just attached to the output batch on the last GetNext() call while
- /// in attach_on_read mode. If true, then 'handle' is closed.
- bool attached_to_output_batch = false;
+ bool retrieved_buffer;
};
/// Runtime state instance used to check for cancellation. Not owned.
@@ -419,7 +400,7 @@ class BufferedTupleStream {
const RowDescriptor* desc_;
/// Plan node ID, used for error reporting.
- int node_id_ = -1;
+ int node_id_;
/// The size of the fixed length portion for each tuple in the row.
std::vector<int> fixed_tuple_sizes_;
@@ -439,18 +420,18 @@ class BufferedTupleStream {
/// List of pages in the stream.
/// Empty iff one of two cases applies:
/// * before the first row has been added with AddRow() or AddRowCustom().
- /// * after the stream has been destructively read in 'attach_on_read' mode
+ /// * after the stream has been destructively read in 'delete_on_read' mode
std::list<Page> pages_;
// IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages.
// TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561.
- int64_t num_pages_ = 0;
+ int64_t num_pages_;
- /// Total size of pages_, including any pages already deleted in 'attach_on_read'
+ /// Total size of pages_, including any pages already deleted in 'delete_on_read'
/// mode.
- int64_t total_byte_size_ = 0;
+ int64_t total_byte_size_;
/// True if there is currently an active read iterator for the stream.
- bool has_read_iterator_ = false;
+ bool has_read_iterator_;
/// The current page being read. When no read iterator is active, equal to list.end().
/// When a read iterator is active, either points to the current read page, or equals
@@ -466,31 +447,31 @@ class BufferedTupleStream {
BufferPool::SubReservation read_page_reservation_;
/// Number of rows returned from the current read_page_.
- uint32_t read_page_rows_returned_ = -1;
+ uint32_t read_page_rows_returned_;
/// Pointer into read_page_ to the byte after the last row read.
- uint8_t* read_ptr_ = nullptr;
+ uint8_t* read_ptr_;
/// Pointer to one byte past the end of read_page_. Used to detect overruns.
- const uint8_t* read_end_ptr_ = nullptr;
+ const uint8_t* read_end_ptr_;
/// Pointer into write_page_ to the byte after the last row written.
- uint8_t* write_ptr_ = nullptr;
+ uint8_t* write_ptr_;
/// Pointer to one byte past the end of write_page_. Cached to speed up computation
- const uint8_t* write_end_ptr_ = nullptr;
+ const uint8_t* write_end_ptr_;
/// Number of rows returned to the caller from GetNext() since the last
/// PrepareForRead() call.
- int64_t rows_returned_ = 0;
+ int64_t rows_returned_;
/// True if there is currently an active write iterator into the stream.
- bool has_write_iterator_ = false;
+ bool has_write_iterator_;
/// The current page for writing. NULL if there is no write iterator or no current
/// write page. Always pinned. Size is 'default_page_len_', except temporarily while
/// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
- Page* write_page_ = nullptr;
+ Page* write_page_;
/// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
/// there is a write iterator, no page currently pinned for writing and the possibility
@@ -503,11 +484,11 @@ class BufferedTupleStream {
/// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
/// to compute it.
- int64_t bytes_pinned_ = 0;
+ int64_t bytes_pinned_;
/// Number of rows stored in the stream. Includes rows that were already deleted during
- /// a destructive 'attach_on_read' pass over the stream.
- int64_t num_rows_ = 0;
+ /// a destructive 'delete_on_read' pass over the stream.
+ int64_t num_rows_;
/// The default length in bytes of pages used to store the stream's rows. All rows that
/// fit in a default-sized page are stored in default-sized page.
@@ -522,14 +503,14 @@ class BufferedTupleStream {
const bool has_nullable_tuple_;
/// If true, pages are deleted after they are read during this read pass. Once rows
- /// have been read from a stream with 'attach_on_read_' true, this is always true.
- bool attach_on_read_ = false;
+ /// have been read from a stream with 'delete_on_read_' true, this is always true.
+ bool delete_on_read_;
- bool closed_ = false; // Used for debugging.
+ bool closed_; // Used for debugging.
/// If true, this stream has been explicitly pinned by the caller and all pages are
/// kept pinned until the caller calls UnpinStream().
- bool pinned_ = true;
+ bool pinned_;
bool is_read_page(const Page* page) const {
return read_page_ != pages_.end() && &*read_page_ == page;
@@ -604,7 +585,7 @@ class BufferedTupleStream {
/// Same as PrepareForRead(), except the iterators are not invalidated and
/// the caller is assumed to have checked there is sufficient unused reservation.
- Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
+ Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
/// Pins the next read page. This blocks reading from disk if necessary to bring the
/// page's data into memory. Updates read_page_, read_ptr_, and
@@ -612,9 +593,7 @@ class BufferedTupleStream {
Status NextReadPage() WARN_UNUSED_RESULT;
/// Invalidate the read iterator, and release any resources associated with the active
- /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
- /// because that would leave the stream in limbo where it still has pages but it is
- /// invalid to read or write from in future.
+ /// iterator.
void InvalidateReadIterator();
/// Returns the total additional bytes that this row will consume in write_page_ if
@@ -639,8 +618,7 @@ class BufferedTupleStream {
void UnpinPageIfNeeded(Page* page, bool stream_pinned);
/// Return the expected pin count for 'page' in the current stream based on the current
- /// read and write pages and whether the stream is pinned. Not valid to call if
- /// the page was just deleted, i.e. page->attached_to_output_batch == false.
+ /// read and write pages and whether the stream is pinned.
int ExpectedPinCount(bool stream_pinned, const Page* page) const;
/// Return true if the stream in its current state needs to have a reservation for
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 90d8c4d..67adb9b 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -418,7 +418,6 @@ class RowBatch {
friend class RowBatchSerializeBaseline;
friend class RowBatchSerializeBenchmark;
friend class RowBatchSerializeTest;
- friend class SimpleTupleStreamTest;
/// Creates an empty row batch based on the serialized row batch header. Called from
/// FromProtobuf() above before desrialization of a protobuf row batch.
http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 91517f1..68f313d 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -247,10 +247,6 @@ class Tuple {
return static_cast<bool*>(GetSlot(offset));
}
- int32_t* GetIntSlot(int offset) {
- return static_cast<int32_t*>(GetSlot(offset));
- }
-
int64_t* GetBigIntSlot(int offset) {
return static_cast<int64_t*>(GetSlot(offset));
}