You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/08 19:02:22 UTC
[08/11] incubator-impala git commit: IMPALA-4674: Part 2.5: Rename
BufferedTupleStreamV2
IMPALA-4674: Part 2.5: Rename BufferedTupleStreamV2
This is cleanup that wasn't included in Part 2.
Testing:
Confirmed that everything (including be tests) built ok,
buffered-tuple-stream-v2-test passed and that I could
run a couple of basic queries.
Change-Id: Ib8b23d7c2d7488d9f74b08cc9adb4ed1a93e3591
Reviewed-on: http://gerrit.cloudera.org:8080/7609
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0c46147e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0c46147e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0c46147e
Branch: refs/heads/master
Commit: 0c46147e5fd93e2a9a63d145d60b656d2f6a7612
Parents: 5caadbb
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 7 09:07:00 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Aug 8 10:22:20 2017 +0000
----------------------------------------------------------------------
be/src/exec/analytic-eval-node.cc | 8 +-
be/src/exec/analytic-eval-node.h | 4 +-
be/src/exec/hash-table-test.cc | 12 +-
be/src/exec/hash-table.cc | 4 +-
be/src/exec/hash-table.h | 14 +-
be/src/exec/hash-table.inline.h | 2 +-
be/src/exec/partitioned-aggregation-node-ir.cc | 2 +-
be/src/exec/partitioned-aggregation-node.cc | 28 +-
be/src/exec/partitioned-aggregation-node.h | 12 +-
be/src/exec/partitioned-hash-join-builder-ir.cc | 10 +-
be/src/exec/partitioned-hash-join-builder.cc | 34 +-
be/src/exec/partitioned-hash-join-builder.h | 24 +-
be/src/exec/partitioned-hash-join-node-ir.cc | 4 +-
be/src/exec/partitioned-hash-join-node.cc | 34 +-
be/src/exec/partitioned-hash-join-node.h | 14 +-
be/src/exec/partitioned-hash-join-node.inline.h | 2 +-
be/src/runtime/CMakeLists.txt | 4 +-
be/src/runtime/buffered-tuple-stream-test.cc | 1462 ++++++++++++++++++
be/src/runtime/buffered-tuple-stream-v2-test.cc | 1462 ------------------
be/src/runtime/buffered-tuple-stream-v2.cc | 1084 -------------
be/src/runtime/buffered-tuple-stream-v2.h | 705 ---------
.../runtime/buffered-tuple-stream-v2.inline.h | 56 -
be/src/runtime/buffered-tuple-stream.cc | 1084 +++++++++++++
be/src/runtime/buffered-tuple-stream.h | 705 +++++++++
be/src/runtime/buffered-tuple-stream.inline.h | 56 +
25 files changed, 3413 insertions(+), 3413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index f6d96ae..af4f866 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -23,7 +23,7 @@
#include "exprs/agg-fn-evaluator.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/descriptors.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
@@ -195,7 +195,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
RETURN_IF_ERROR(ClaimBufferReservation(state));
}
DCHECK(input_stream_ == nullptr);
- input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(),
+ input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(),
&buffer_pool_client_, resource_profile_.spillable_buffer_size,
resource_profile_.spillable_buffer_size));
RETURN_IF_ERROR(input_stream_->Init(id(), true));
@@ -363,7 +363,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
// TODO: Consider re-pinning later if the output stream is fully consumed.
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
- input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+ input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
if (!input_stream_->AddRow(row, &status)) {
// Rows should be added in unpinned mode unless an error occurs.
@@ -623,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
<< " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
SCOPED_TIMER(evaluation_timer_);
- // BufferedTupleStreamV2::num_rows() returns the total number of rows that have been
+ // BufferedTupleStream::num_rows() returns the total number of rows that have been
// inserted into the stream (it does not decrease when we read rows), so the index of
// the next input row that will be inserted will be the current size of the stream.
int64_t stream_idx = input_stream_->num_rows();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index eab9198..671eaa4 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -19,7 +19,7 @@
#define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
#include "exec/exec-node.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
#include "runtime/tuple.h"
namespace impala {
@@ -339,7 +339,7 @@ class AnalyticEvalNode : public ExecNode {
/// buffers with tuple data are attached to an output row batch on eos or
/// ReachedLimit().
/// TODO: Consider re-pinning unpinned streams when possible.
- boost::scoped_ptr<BufferedTupleStreamV2> input_stream_;
+ boost::scoped_ptr<BufferedTupleStream> input_stream_;
/// Pool used for O(1) allocations that live until Close() or Reset().
/// Does not own data backing tuples returned in GetNext(), so it does not
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 7a6ec9d..aad9134 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -309,7 +309,7 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < 2; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
Status status;
bool inserted =
@@ -350,7 +350,7 @@ class HashTableTest : public testing::Test {
ASSERT_TRUE(success);
for (int i = 0; i < 5; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted =
hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
@@ -418,7 +418,7 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < val; ++i) {
TupleRow* row = CreateTupleRow(val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status));
ASSERT_OK(status);
@@ -481,7 +481,7 @@ class HashTableTest : public testing::Test {
for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
TupleRow* row = CreateTupleRow(build_row_val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
ASSERT_OK(status);
@@ -518,7 +518,7 @@ class HashTableTest : public testing::Test {
while (true) {
TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue;
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
bool inserted =
hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status);
ASSERT_OK(status);
@@ -569,7 +569,7 @@ class HashTableTest : public testing::Test {
// Insert using both Insert() and FindBucket() methods.
if (build_row_val % 2 == 0) {
- BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
EXPECT_TRUE(inserted);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index aacedc2..e65d9f1 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -385,14 +385,14 @@ constexpr double HashTable::MAX_FILL_FACTOR;
constexpr int64_t HashTable::DATA_PAGE_SIZE;
HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates,
- int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+ int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets,
int64_t initial_num_buckets) {
return new HashTable(FLAGS_enable_quadratic_probing, allocator, stores_duplicates,
num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
}
HashTable::HashTable(bool quadratic_probing, Suballocator* allocator,
- bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* stream,
+ bool stores_duplicates, int num_build_tuples, BufferedTupleStream* stream,
int64_t max_num_buckets, int64_t num_buckets)
: allocator_(allocator),
tuple_stream_(stream),
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 297e619..d764640 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -26,8 +26,8 @@
#include "codegen/impala-ir.h"
#include "common/compiler-util.h"
#include "common/logging.h"
-#include "runtime/buffered-tuple-stream-v2.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/suballocator.h"
#include "runtime/tuple-row.h"
@@ -533,7 +533,7 @@ class HashTable {
/// of two formats, depending on the number of tuples in the row.
union HtData {
// For rows with multiple tuples per row, a pointer to the flattened TupleRow.
- BufferedTupleStreamV2::FlatRowPtr flat_row;
+ BufferedTupleStream::FlatRowPtr flat_row;
// For rows with one tuple per row, a pointer to the Tuple itself.
Tuple* tuple;
};
@@ -600,7 +600,7 @@ class HashTable {
/// - initial_num_buckets: number of buckets that the hash table should be initialized
/// with.
static HashTable* Create(Suballocator* allocator, bool stores_duplicates,
- int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+ int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets,
int64_t initial_num_buckets);
/// Allocates the initial bucket structure. Returns a non-OK status if an error is
@@ -623,7 +623,7 @@ class HashTable {
/// is stored. The 'row' is not copied by the hash table and the caller must guarantee
/// it stays in memory. This will not grow the hash table.
bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
- BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row,
+ BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row,
Status* status) WARN_UNUSED_RESULT;
/// Prefetch the hash table bucket which the given hash value 'hash' maps to.
@@ -819,7 +819,7 @@ class HashTable {
/// - quadratic_probing: set to true when the probing algorithm is quadratic, as
/// opposed to linear.
HashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates,
- int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+ int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets,
int64_t initial_num_buckets);
/// Performs the probing operation according to the probing algorithm (linear or
@@ -918,7 +918,7 @@ class HashTable {
/// Stream contains the rows referenced by the hash table. Can be NULL if the
/// row only contains a single tuple, in which case the TupleRow indirection
/// is removed by the hash table.
- BufferedTupleStreamV2* tuple_stream_;
+ BufferedTupleStream* tuple_stream_;
/// Constants on how the hash table should behave.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index ce2f784..85d7ad6 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -109,7 +109,7 @@ inline HashTable::HtData* HashTable::InsertInternal(
}
inline bool HashTable::Insert(HashTableCtx* ht_ctx,
- BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) {
+ BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) {
HtData* htdata = InsertInternal(ht_ctx, status);
// If successful insert, update the contents of the newly inserted entry with 'idx'.
if (LIKELY(htdata != NULL)) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 126a2a5..9baada1 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -21,7 +21,7 @@
#include "exprs/agg-fn-evaluator.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/row-batch.h"
#include "runtime/tuple-row.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index fc0a4a6..16db5cc 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -31,7 +31,7 @@
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
#include "gutil/strings/substitute.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
@@ -275,7 +275,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
&buffer_pool_client_, resource_profile_.spillable_buffer_size));
if (!is_streaming_preagg_ && needs_serialize_) {
- serialize_stream_.reset(new BufferedTupleStreamV2(state, &intermediate_row_desc_,
+ serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_,
&buffer_pool_client_, resource_profile_.spillable_buffer_size,
resource_profile_.spillable_buffer_size));
RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
@@ -722,7 +722,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
}
}
- aggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+ aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
&parent->intermediate_row_desc_, &parent->buffer_pool_client_,
parent->resource_profile_.spillable_buffer_size,
parent->resource_profile_.spillable_buffer_size, external_varlen_slots));
@@ -740,7 +740,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
}
if (!parent->is_streaming_preagg_) {
- unaggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+ unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
parent->child(0)->row_desc(), &parent->buffer_pool_client_,
parent->resource_profile_.spillable_buffer_size,
parent->resource_profile_.spillable_buffer_size));
@@ -786,7 +786,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
// Serialize and copy the spilled partition's stream into the new stream.
Status status;
- BufferedTupleStreamV2* new_stream = parent->serialize_stream_.get();
+ BufferedTupleStream* new_stream = parent->serialize_stream_.get();
HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
while (!it.AtEnd()) {
Tuple* tuple = it.GetTuple();
@@ -811,7 +811,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
// when we need to spill again. We need to have this available before we need
// to spill to make sure it is available. This should be acquirable since we just
// freed at least one buffer from this partition's (old) aggregated_row_stream.
- parent->serialize_stream_.reset(new BufferedTupleStreamV2(parent->state_,
+ parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
&parent->intermediate_row_desc_, &parent->buffer_pool_client_,
parent->resource_profile_.spillable_buffer_size,
parent->resource_profile_.spillable_buffer_size));
@@ -866,9 +866,9 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
DCHECK(aggregated_row_stream->has_write_iterator());
DCHECK(!unaggregated_row_stream->has_write_iterator());
if (more_aggregate_rows) {
- aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+ aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
} else {
- aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
bool got_buffer;
RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
DCHECK(got_buffer)
@@ -932,7 +932,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
}
Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
- const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStreamV2* stream,
+ const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
Status* status) noexcept {
DCHECK(stream != NULL && status != NULL);
// Allocate space for the entire tuple in the stream.
@@ -1077,7 +1077,7 @@ Status PartitionedAggregationNode::AppendSpilledRow(
Partition* __restrict__ partition, TupleRow* __restrict__ row) {
DCHECK(!is_streaming_preagg_);
DCHECK(partition->is_spilled());
- BufferedTupleStreamV2* stream = AGGREGATED_ROWS ?
+ BufferedTupleStream* stream = AGGREGATED_ROWS ?
partition->aggregated_row_stream.get() :
partition->unaggregated_row_stream.get();
DCHECK(!stream->is_pinned());
@@ -1297,7 +1297,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
if (!hash_partition->is_spilled()) continue;
// The aggregated rows have been repartitioned. Free up at least a buffer's worth of
// reservation and use it to pin the unaggregated write buffer.
- hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
bool got_buffer;
RETURN_IF_ERROR(
hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -1332,7 +1332,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
}
template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStreamV2* input_stream) {
+Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stream) {
DCHECK(!is_streaming_preagg_);
if (input_stream->num_rows() > 0) {
while (true) {
@@ -1430,8 +1430,8 @@ void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
// Ensure all pages in the spilled partition's streams are unpinned by invalidating
// the streams' read and write iterators. We may need all the memory to process the
// next spilled partitions.
- partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
- partition->unaggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
spilled_partitions_.push_front(partition);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 4f8b622..c230630 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -25,7 +25,7 @@
#include "exec/exec-node.h"
#include "exec/hash-table.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
#include "runtime/bufferpool/suballocator.h"
#include "runtime/descriptors.h" // for TupleId
#include "runtime/mem-pool.h"
@@ -425,18 +425,18 @@ class PartitionedAggregationNode : public ExecNode {
/// For streaming preaggs, this may be NULL if sufficient memory is not available.
/// In that case hash_tbl is also NULL and all rows for the partition will be passed
/// through.
- boost::scoped_ptr<BufferedTupleStreamV2> aggregated_row_stream;
+ boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
/// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations.
/// Always unpinned. Has a write buffer allocated when the partition is spilled and
/// unaggregated rows are being processed.
- boost::scoped_ptr<BufferedTupleStreamV2> unaggregated_row_stream;
+ boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
};
/// Stream used to store serialized spilled rows. Only used if needs_serialize_
/// is set. This stream is never pinned and only used in Partition::Spill as a
/// a temporary buffer.
- boost::scoped_ptr<BufferedTupleStreamV2> serialize_stream_;
+ boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
/// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
@@ -471,7 +471,7 @@ class PartitionedAggregationNode : public ExecNode {
/// FunctionContexts, so is stored outside the stream. If stream's small buffers get
/// full, it will attempt to switch to IO-buffers.
Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
- BufferedTupleStreamV2* stream, Status* status) noexcept;
+ BufferedTupleStream* stream, Status* status) noexcept;
/// Constructs intermediate tuple, allocating memory from pool instead of the stream.
/// Returns NULL and sets status if there is not enough memory to allocate the tuple.
@@ -571,7 +571,7 @@ class PartitionedAggregationNode : public ExecNode {
/// Reads all the rows from input_stream and process them by calling ProcessBatch().
template <bool AGGREGATED_ROWS>
- Status ProcessStream(BufferedTupleStreamV2* input_stream) WARN_UNUSED_RESULT;
+ Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
/// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
void GetSingletonOutput(RowBatch* row_batch);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index df58036..e15e116 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -19,7 +19,7 @@
#include "codegen/impala-ir.h"
#include "exec/hash-table.inline.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter.h"
@@ -30,7 +30,7 @@
using namespace impala;
inline bool PhjBuilder::AppendRow(
- BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+ BufferedTupleStream* stream, TupleRow* row, Status* status) {
if (LIKELY(stream->AddRow(row, status))) return true;
if (UNLIKELY(!status->ok())) return false;
return AppendRowStreamFull(stream, row, status);
@@ -73,12 +73,12 @@ Status PhjBuilder::ProcessBuildBatch(
bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
HashTableCtx* ht_ctx, RowBatch* batch,
- const vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, Status* status) {
+ const vector<BufferedTupleStream::FlatRowPtr>& flat_rows, Status* status) {
// Compute the hash values and prefetch the hash table buckets.
const int num_rows = batch->num_rows();
HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
const int prefetch_size = expr_vals_cache->capacity();
- const BufferedTupleStreamV2::FlatRowPtr* flat_rows_data = flat_rows.data();
+ const BufferedTupleStream::FlatRowPtr* flat_rows_data = flat_rows.data();
for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
prefetch_group_row += prefetch_size) {
int cur_row = prefetch_group_row;
@@ -97,7 +97,7 @@ bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
expr_vals_cache->ResetForRead();
FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
TupleRow* row = batch_iter.Get();
- BufferedTupleStreamV2::FlatRowPtr flat_row = flat_rows_data[cur_row];
+ BufferedTupleStream::FlatRowPtr flat_row = flat_rows_data[cur_row];
if (!expr_vals_cache->IsRowNull()
&& UNLIKELY(!hash_tbl_->Insert(ht_ctx, flat_row, row, status))) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a2f7c96..2dc2d8d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -25,7 +25,7 @@
#include "exec/hash-table.inline.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
@@ -293,11 +293,11 @@ Status PhjBuilder::CreateHashPartitions(int level) {
}
bool PhjBuilder::AppendRowStreamFull(
- BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept {
+ BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
while (true) {
// We ran out of memory. Pick a partition to spill. If we ran out of unspilled
// partitions, SpillPartition() will return an error status.
- *status = SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+ *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
if (!status->ok()) return false;
if (stream->AddRow(row, status)) return true;
if (!status->ok()) return false;
@@ -307,7 +307,7 @@ bool PhjBuilder::AppendRowStreamFull(
}
// TODO: can we do better with a different spilling heuristic?
-Status PhjBuilder::SpillPartition(BufferedTupleStreamV2::UnpinMode mode) {
+Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
int64_t max_freed_mem = 0;
int partition_idx = -1;
@@ -367,7 +367,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
partition->Close(NULL);
} else if (partition->is_spilled()) {
// We don't need any build-side data for spilled partitions in memory.
- partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
}
}
@@ -386,7 +386,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
RETURN_IF_ERROR(partition->BuildHashTable(&built));
// If we did not have enough memory to build this hash table, we need to spill this
// partition (clean up the hash table, unpin build).
- if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
+ if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
}
// We may have spilled additional partitions while building hash tables, we need to
@@ -423,9 +423,9 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
while (probe_streams_to_create > 0) {
// Create stream in vector, so that it will be cleaned up after any failure.
spilled_partition_probe_streams_.emplace_back(
- make_unique<BufferedTupleStreamV2>(runtime_state_, probe_row_desc_,
+ make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_,
buffer_pool_client_, spillable_buffer_size_, spillable_buffer_size_));
- BufferedTupleStreamV2* probe_stream = spilled_partition_probe_streams_.back().get();
+ BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false));
// Loop until either the stream gets a buffer or all partitions are spilled (in which
@@ -435,7 +435,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
if (got_buffer) break;
- RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL));
+ RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
++probe_streams_to_create;
}
--probe_streams_to_create;
@@ -443,7 +443,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
return Status::OK();
}
-vector<unique_ptr<BufferedTupleStreamV2>> PhjBuilder::TransferProbeStreams() {
+vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
return std::move(spilled_partition_probe_streams_);
}
@@ -453,7 +453,7 @@ void PhjBuilder::CloseAndDeletePartitions() {
all_partitions_.clear();
hash_partitions_.clear();
null_aware_partition_ = NULL;
- for (unique_ptr<BufferedTupleStreamV2>& stream : spilled_partition_probe_streams_) {
+ for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
spilled_partition_probe_streams_.clear();
@@ -505,14 +505,14 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
}
Status PhjBuilder::RepartitionBuildInput(
- Partition* input_partition, int level, BufferedTupleStreamV2* input_probe_rows) {
+ Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) {
DCHECK_GE(level, 1);
SCOPED_TIMER(repartition_timer_);
COUNTER_ADD(num_repartitions_, 1);
RuntimeState* state = runtime_state_;
// Setup the read buffer and the new partitions.
- BufferedTupleStreamV2* build_rows = input_partition->build_rows();
+ BufferedTupleStream* build_rows = input_partition->build_rows();
DCHECK(build_rows != NULL);
bool got_read_buffer;
RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
@@ -545,7 +545,7 @@ Status PhjBuilder::RepartitionBuildInput(
bool got_buffer;
RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer));
if (got_buffer) break;
- RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT));
+ RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
}
RETURN_IF_ERROR(FlushFinal(state));
@@ -573,7 +573,7 @@ bool PhjBuilder::HashTableStoresNulls() const {
PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
: parent_(parent), is_spilled_(false), level_(level) {
- build_rows_ = make_unique<BufferedTupleStreamV2>(state, parent_->row_desc_,
+ build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_,
parent_->buffer_pool_client_, parent->spillable_buffer_size_,
parent->spillable_buffer_size_);
}
@@ -602,7 +602,7 @@ void PhjBuilder::Partition::Close(RowBatch* batch) {
}
}
-Status PhjBuilder::Partition::Spill(BufferedTupleStreamV2::UnpinMode mode) {
+Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
DCHECK(!IsClosed());
RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker()));
// Close the hash table and unpin the stream backing it to free memory.
@@ -634,7 +634,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
HashTableCtx* ctx = parent_->ht_ctx_.get();
ctx->set_level(level()); // Set the hash function for building the hash table.
RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker());
- vector<BufferedTupleStreamV2::FlatRowPtr> flat_rows;
+ vector<BufferedTupleStream::FlatRowPtr> flat_rows;
bool eos = false;
// Allocate the partition-local hash table. Initialize the number of buckets based on
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 912613d..277579c 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -26,7 +26,7 @@
#include "exec/data-sink.h"
#include "exec/filter-context.h"
#include "exec/hash-table.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/suballocator.h"
@@ -103,7 +103,7 @@ class PhjBuilder : public DataSink {
/// Transfer ownership of the probe streams to the caller. One stream was allocated per
/// spilled partition in FlushFinal(). The probe streams are empty but prepared for
/// writing with a write buffer allocated.
- std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams();
+ std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
/// Clears the current list of hash partitions. Called after probing of the partitions
/// is done. The partitions are not closed or destroyed, since they may be spilled or
@@ -124,7 +124,7 @@ class PhjBuilder : public DataSink {
/// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
/// has enough buffers preallocated to execute successfully.
Status RepartitionBuildInput(Partition* input_partition, int level,
- BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT;
+ BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
/// Returns the largest build row count out of the current hash partitions.
int64_t LargestPartitionRows() const;
@@ -201,10 +201,10 @@ class PhjBuilder : public DataSink {
/// Spills this partition, the partition's stream is unpinned with 'mode' and
/// its hash table is destroyed if it was built.
- Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
+ Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
- BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
+ BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
int ALWAYS_INLINE level() const { return level_; }
@@ -220,7 +220,7 @@ class PhjBuilder : public DataSink {
/// failed: if 'status' is ok, inserting failed because not enough reservation
/// was available and if 'status' is an error, inserting failed because of that error.
bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
- RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows,
+ RowBatch* batch, const std::vector<BufferedTupleStream::FlatRowPtr>& flat_rows,
Status* status);
const PhjBuilder* parent_;
@@ -239,7 +239,7 @@ class PhjBuilder : public DataSink {
/// Stream of build tuples in this partition. Initially owned by this object but
/// transferred to the parent exec node (via the row batch) when the partition
/// is closed. If NULL, ownership has been transferred and the partition is closed.
- std::unique_ptr<BufferedTupleStreamV2> build_rows_;
+ std::unique_ptr<BufferedTupleStream> build_rows_;
};
/// Computes the minimum number of buffers required to execute the spilling partitioned
@@ -288,19 +288,19 @@ class PhjBuilder : public DataSink {
/// partitions. This odd return convention is used to avoid emitting unnecessary code
/// for ~Status in perf-critical code.
bool AppendRow(
- BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+ BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
/// Slow path for AppendRow() above. It is called when the stream has failed to append
/// the row. We need to find more memory by either switching to IO-buffers, in case the
/// stream still uses small buffers, or spilling a partition. Returns false and sets
/// 'status' if it was unable to append the row, even after spilling partitions.
- bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row,
+ bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
Status* status) noexcept WARN_UNUSED_RESULT;
/// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
/// to the Spill() call for the selected partition. The current policy is to spill the
/// largest partition. Returns non-ok status if we couldn't spill a partition.
- Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
+ Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
/// Tries to build hash tables for all unspilled hash partitions. Called after
/// FlushFinal() when all build rows have been partitioned and added to the appropriate
@@ -464,7 +464,7 @@ class PhjBuilder : public DataSink {
///
/// Because of this, at the end of the build phase, we always have sufficient memory
/// to execute the probe phase of the algorithm without spilling more partitions.
- std::vector<std::unique_ptr<BufferedTupleStreamV2>> spilled_partition_probe_streams_;
+ std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
/// END: Members that must be Reset()
/////////////////////////////////////////
@@ -479,7 +479,7 @@ class PhjBuilder : public DataSink {
ProcessBuildBatchFn process_build_batch_fn_level0_;
typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
- const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*);
+ const std::vector<BufferedTupleStream::FlatRowPtr>&, Status*);
/// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
InsertBatchFn insert_batch_fn_;
InsertBatchFn insert_batch_fn_level0_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index b890eb9..3106419 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
// The partition is not in memory, spill the probe row and move to the next row.
// Skip the current row if we manage to append to the spilled partition's BTS.
// Otherwise, we need to bail out and report the failure.
- BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
+ BufferedTupleStream* probe_rows = probe_partition->probe_rows();
if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) {
DCHECK(!status->ok());
return false;
@@ -438,7 +438,7 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
}
inline bool PartitionedHashJoinNode::AppendProbeRow(
- BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+ BufferedTupleStream* stream, TupleRow* row, Status* status) {
DCHECK(stream->has_write_iterator());
DCHECK(!stream->is_pinned());
return stream->AddRow(row, status);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2db9e00..806bdc0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -27,7 +27,7 @@
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -265,7 +265,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
- unique_ptr<BufferedTupleStreamV2> probe_rows)
+ unique_ptr<BufferedTupleStream> probe_rows)
: build_partition_(build_partition),
probe_rows_(std::move(probe_rows)) {
DCHECK(probe_rows_->has_write_iterator());
@@ -328,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
probe_batch_pos_ = -1;
return Status::OK();
}
- BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows();
+ BufferedTupleStream* probe_rows = input_partition_->probe_rows();
if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
// Continue from the current probe stream.
bool eos = false;
@@ -420,9 +420,9 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
ht_ctx_->set_level(next_partition_level);
// Spill to free memory from hash tables and pinned streams for use in new partitions.
- RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
+ RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
// Temporarily free up the probe buffer to use when repartitioning.
- input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -822,7 +822,7 @@ static Status NullAwareAntiJoinError(bool build) {
Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
RuntimeState* state = runtime_state_;
- unique_ptr<BufferedTupleStreamV2> probe_rows = make_unique<BufferedTupleStreamV2>(
+ unique_ptr<BufferedTupleStream> probe_rows = make_unique<BufferedTupleStream>(
state, child(0)->row_desc(), &buffer_pool_client_,
resource_profile_.spillable_buffer_size,
resource_profile_.spillable_buffer_size);
@@ -847,7 +847,7 @@ error:
Status PartitionedHashJoinNode::InitNullProbeRows() {
RuntimeState* state = runtime_state_;
- null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, child(0)->row_desc(),
+ null_probe_rows_ = make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
&buffer_pool_client_, resource_profile_.spillable_buffer_size,
resource_profile_.spillable_buffer_size);
// TODO: we shouldn't start with this unpinned if spilling is disabled.
@@ -866,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
DCHECK_EQ(probe_batch_pos_, -1);
DCHECK_EQ(probe_batch_->num_rows(), 0);
- BufferedTupleStreamV2* build_stream = builder_->null_aware_partition()->build_rows();
- BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
+ BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
+ BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
if (build_stream->num_rows() == 0) {
// There were no build rows. Nothing to do. Just prepare to output the null
@@ -904,7 +904,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
int num_join_conjuncts = other_join_conjuncts_.size();
DCHECK(probe_batch_ != NULL);
- BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
+ BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
if (probe_batch_pos_ == probe_batch_->num_rows()) {
probe_batch_pos_ = 0;
probe_batch_->TransferResourceOwnership(out_batch);
@@ -952,7 +952,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
DCHECK(probe_hash_partitions_.empty());
// Initialize the probe partitions, providing them with probe streams.
- vector<unique_ptr<BufferedTupleStreamV2>> probe_streams =
+ vector<unique_ptr<BufferedTupleStream>> probe_streams =
builder_->TransferProbeStreams();
probe_hash_partitions_.resize(PARTITION_FANOUT);
for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -989,7 +989,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
}
void PartitionedHashJoinNode::CreateProbePartition(
- int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) {
+ int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
DCHECK_GE(partition_idx, 0);
DCHECK_LT(partition_idx, probe_hash_partitions_.size());
DCHECK(probe_hash_partitions_[partition_idx] == NULL);
@@ -998,7 +998,7 @@ void PartitionedHashJoinNode::CreateProbePartition(
}
Status PartitionedHashJoinNode::EvaluateNullProbe(
- RuntimeState* state, BufferedTupleStreamV2* build) {
+ RuntimeState* state, BufferedTupleStream* build) {
if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
return Status::OK();
}
@@ -1067,9 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
// can recurse the algorithm and create new hash partitions from spilled partitions.
// TODO: we shouldn't need to unpin the build stream if we stop spilling
// while probing.
- build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
- probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
if (probe_partition->probe_rows()->num_rows() != 0
|| NeedToProcessUnmatchedBuildRows()) {
@@ -1108,7 +1108,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
// Just finished evaluating the null probe rows with all the non-spilled build
// partitions. Unpin this now to free this memory for repartitioning.
if (null_probe_rows_ != NULL) {
- null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+ null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
}
builder_->ClearHashPartitions();
@@ -1170,7 +1170,7 @@ string PartitionedHashJoinNode::NodeDebugString() const {
ss << " Probe hash partition " << i << ": ";
if (probe_partition != NULL) {
ss << "probe ptr=" << probe_partition;
- BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
+ BufferedTupleStream* probe_rows = probe_partition->probe_rows();
if (probe_rows != NULL) {
ss << " Probe Rows: " << probe_rows->num_rows()
<< " (Bytes pinned: " << probe_rows->BytesPinned(false) << ")";
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index b3f663e..6ed5269 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -162,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Creates an initialized probe partition at 'partition_idx' in
/// 'probe_hash_partitions_'.
void CreateProbePartition(
- int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows);
+ int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
/// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
/// a write buffer allocated, so this will succeed unless an error is encountered.
@@ -170,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// return convention is used to avoid emitting unnecessary code for ~Status in perf-
/// critical code.
bool AppendProbeRow(
- BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+ BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
/// Probes the hash table for rows matching the current probe row and appends
/// all the matching build rows (with probe row) to output batch. Returns true
@@ -325,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// conjuncts pass (i.e. there is a match).
/// This is used for NAAJ, when there are NULL probe rows.
Status EvaluateNullProbe(
- RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT;
+ RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
/// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
/// matched_null_probe_ should have been fully evaluated.
@@ -472,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// For NAAJ, this stream contains all probe rows that had NULL on the hash table
/// conjuncts. Must be unique_ptr so we can release it and transfer to output batches.
- std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_;
+ std::unique_ptr<BufferedTupleStream> null_probe_rows_;
/// For each row in null_probe_rows_, true if this row has matched any build row
/// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -504,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// that has been prepared for writing with an I/O-sized write buffer.
ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
PhjBuilder::Partition* build_partition,
- std::unique_ptr<BufferedTupleStreamV2> probe_rows);
+ std::unique_ptr<BufferedTupleStream> probe_rows);
~ProbePartition();
/// Prepare to read the probe rows. Allocates the first read block, so reads will
@@ -517,7 +517,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// resources if 'batch' is NULL. Idempotent.
void Close(RowBatch* batch);
- BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
+ BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
PhjBuilder::Partition* build_partition() { return build_partition_; }
inline bool IsClosed() const { return probe_rows_ == NULL; }
@@ -529,7 +529,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Stream of probe tuples in this partition. Initially owned by this object but
/// transferred to the parent exec node (via the row batch) when the partition
/// is complete. If NULL, ownership was transferred and the partition is closed.
- std::unique_ptr<BufferedTupleStreamV2> probe_rows_;
+ std::unique_ptr<BufferedTupleStream> probe_rows_;
};
/// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index 3441aac..a53b40e 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -20,7 +20,7 @@
#include "exec/partitioned-hash-join-node.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 92af968..391fd01 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -24,7 +24,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
add_library(Runtime
- buffered-tuple-stream-v2.cc
+ buffered-tuple-stream.cc
client-cache.cc
coordinator.cc
coordinator-backend-state.cc
@@ -91,7 +91,7 @@ ADD_BE_TEST(thread-resource-mgr-test)
ADD_BE_TEST(mem-tracker-test)
ADD_BE_TEST(multi-precision-test)
ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-v2-test)
+ADD_BE_TEST(buffered-tuple-stream-test)
ADD_BE_TEST(hdfs-fs-cache-test)
ADD_BE_TEST(tmp-file-mgr-test)
ADD_BE_TEST(row-batch-serialize-test)