You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/13 00:21:45 UTC
[2/3] incubator-impala git commit: IMPALA-5773: Correctly account for
memory used in data stream receiver queue
IMPALA-5773: Correctly account for memory used in data stream receiver queue
DataStreamRecvrs keep one or more queues of batches received to provide
some buffering. Each queue has a fixed byte size capacity. The estimate
of the contribution of a new RowBatch to that queue was using the
compressed size of the TRowBatch it would be deserialized from, which is
the wrong value (since the batch is uncompressed after deserialization).
* Add RowBatch::Get[Des|S]erializedSize(const TRowBatch&) to RowBatch
* Fix the estimate to use the uncompressed size.
* Add a DataStreamReceiver child profile to the exchg node so that the
peak memory used by the receiver can be monitored easily.
Confirmed that the following query:
select count(distinct concat(cast(l_comment as char(120)),
cast(l_comment as char(120)),
cast(l_comment as char(120)),
cast(l_comment as char(120)),
cast(l_comment as char(120)),
cast(l_comment as char(120))) from lineitem;
succeeds with a mem-limit of 800Mb. Before this patch it would fail in a
one-node cluster as the datastream recvr would buffer more batches than
the memory limit would allow.
Change-Id: I9e90f9596ee984438e3373af05e84d361702ca6a
Reviewed-on: http://gerrit.cloudera.org:8080/7646
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f2f52a8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f2f52a8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f2f52a8e
Branch: refs/heads/master
Commit: f2f52a8e1ce9560329566ee71945b3901a1ef958
Parents: 200ce49
Author: Henry Robinson <he...@cloudera.com>
Authored: Tue Aug 8 12:21:05 2017 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Sat Aug 12 01:49:50 2017 +0000
----------------------------------------------------------------------
.../benchmarks/row-batch-serialize-benchmark.cc | 4 +--
be/src/exec/exchange-node.cc | 4 +--
be/src/runtime/data-stream-mgr-base.h | 4 +--
be/src/runtime/data-stream-mgr.cc | 6 ++---
be/src/runtime/data-stream-mgr.h | 4 +--
be/src/runtime/data-stream-recvr.cc | 27 ++++++++++----------
be/src/runtime/data-stream-recvr.h | 8 +++---
be/src/runtime/data-stream-sender.cc | 12 ++++-----
be/src/runtime/krpc-data-stream-mgr.cc | 11 ++++----
be/src/runtime/krpc-data-stream-mgr.h | 4 +--
be/src/runtime/row-batch.cc | 11 ++++++--
be/src/runtime/row-batch.h | 7 +++--
12 files changed, 54 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 350252d..ee531e6 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -140,9 +140,7 @@ class RowBatchSerializeBaseline {
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
- // The size output_batch would be if we didn't compress tuple_data (will be equal to
- // actual batch size if tuple_data isn't compressed)
- return batch->GetBatchSize(*output_batch) - output_batch->tuple_data.size() + size;
+ return RowBatch::GetDeserializedSize(*output_batch);
}
// Copy of baseline version without dedup logic
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 9b5f548..a0c002a 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -38,8 +38,8 @@ DECLARE_int32(stress_datastream_recvr_delay_ms);
using namespace impala;
-DEFINE_int32(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
- "(Advanced) Maximum size of per-query receive-side buffer");
+DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
+ "(Advanced) Maximum size of per-query receive-side buffer");
ExchangeNode::ExchangeNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index 53798b1..7886cfa 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -44,8 +44,8 @@ class DataStreamMgrBase {
/// Create a receiver for a specific fragment_instance_id/node_id destination;
virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
- bool is_merging) = 0;
+ PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+ RuntimeProfile* profile, bool is_merging) = 0;
/// Notifies the recvr associated with the fragment/node id that the specified
/// sender has closed.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 82bc01f..a161ad3 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -77,8 +77,8 @@ inline uint32_t DataStreamMgr::GetHashValue(
shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
- bool is_merging) {
+ PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+ RuntimeProfile* profile, bool is_merging) {
DCHECK(profile != NULL);
VLOG_FILE << "creating receiver for fragment="
<< fragment_instance_id << ", node=" << dest_node_id;
@@ -172,7 +172,7 @@ Status DataStreamMgr::AddData(const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, const TRowBatch& thrift_batch, int sender_id) {
VLOG_ROW << "AddData(): fragment_instance_id=" << fragment_instance_id
<< " node=" << dest_node_id
- << " size=" << RowBatch::GetBatchSize(thrift_batch);
+ << " size=" << RowBatch::GetDeserializedSize(thrift_batch);
bool already_unregistered;
shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
&already_unregistered);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 5dae908..eff468e 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -77,8 +77,8 @@ class DataStreamMgr : public DataStreamMgrBase {
/// caller.
std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
- bool is_merging) override;
+ PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+ RuntimeProfile* profile, bool is_merging) override;
/// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
/// if the recvr has not been cancelled. sender_id identifies the sender instance
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 3828a3e..0c6d98e 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -151,8 +151,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
unique_lock<mutex> l(lock_);
if (is_cancelled_) return;
- int batch_size = RowBatch::GetBatchSize(thrift_batch);
- COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
+ COUNTER_ADD(recvr_->bytes_received_counter_, RowBatch::GetSerializedSize(thrift_batch));
DCHECK_GT(num_remaining_senders_, 0);
// if there's something in the queue and this batch will push us over the
@@ -162,6 +161,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
// received from a specific queue based on data order, and the pipeline will stall
// if the merger is waiting for data from an empty queue that cannot be filled because
// the limit has been reached.
+ int64_t batch_size = RowBatch::GetDeserializedSize(thrift_batch);
while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_);
VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
@@ -285,7 +285,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
RuntimeProfile* profile)
: mgr_(stream_mgr),
fragment_instance_id_(fragment_instance_id),
@@ -295,7 +295,6 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
is_merging_(is_merging),
num_buffered_bytes_(0),
profile_(profile) {
- mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker));
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
sender_queues_.reserve(num_queues);
@@ -306,17 +305,19 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
sender_queues_.push_back(queue);
}
+ RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver");
+ mem_tracker_.reset(
+ new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker));
+
// Initialize the counters
- bytes_received_counter_ =
- ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
+ bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", TUnit::BYTES);
bytes_received_time_series_counter_ =
- ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
- deserialize_row_batch_timer_ =
- ADD_TIMER(profile_, "DeserializeRowBatchTimer");
- buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
- buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
- data_arrival_timer_ = profile_->inactive_timer();
- first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
+ ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", bytes_received_counter_);
+ deserialize_row_batch_timer_ = ADD_TIMER(child_profile, "DeserializeRowBatchTimer");
+ buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer");
+ buffer_full_total_timer_ = ADD_TIMER(child_profile, "SendersBlockedTotalTimer(*)");
+ data_arrival_timer_ = child_profile->inactive_timer();
+ first_batch_wait_total_timer_ = ADD_TIMER(child_profile, "FirstBatchArrivalWaitTime");
}
Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index 468f58a..fad588d 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -104,8 +104,8 @@ class DataStreamRecvr : public DataStreamRecvrBase {
DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
- RuntimeProfile* profile);
+ PlanNodeId dest_node_id, int num_senders, bool is_merging,
+ int64_t total_buffer_limit, RuntimeProfile* profile);
/// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
/// full. Called from DataStreamMgr.
@@ -120,7 +120,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
/// Return true if the addition of a new batch of size 'batch_size' would exceed the
/// total buffer limit.
- bool ExceedsLimit(int batch_size) {
+ bool ExceedsLimit(int64_t batch_size) {
return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
}
@@ -144,7 +144,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
bool is_merging_;
/// total number of bytes held across all sender queues.
- AtomicInt32 num_buffered_bytes_;
+ AtomicInt64 num_buffered_bytes_;
/// Memtracker for batches in the sender queue(s).
boost::scoped_ptr<MemTracker> mem_tracker_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 16cf87e..752e3ce 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -218,7 +218,7 @@ void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
if (res.status.status_code != TErrorCode::OK) {
rpc_status_ = res.status;
} else {
- num_data_bytes_sent_ += RowBatch::GetBatchSize(*batch);
+ num_data_bytes_sent_ += RowBatch::GetSerializedSize(*batch);
VLOG_ROW << "incremented #data_bytes_sent="
<< num_data_bytes_sent_;
}
@@ -507,17 +507,15 @@ void DataStreamSender::Close(RuntimeState* state) {
closed_ = true;
}
-Status DataStreamSender::SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers) {
+Status DataStreamSender::SerializeBatch(
+ RowBatch* src, TRowBatch* dest, int num_receivers) {
VLOG_ROW << "serializing " << src->num_rows() << " rows";
{
SCOPED_TIMER(profile_->total_time_counter());
SCOPED_TIMER(serialize_batch_timer_);
RETURN_IF_ERROR(src->Serialize(dest));
- int bytes = RowBatch::GetBatchSize(*dest);
- int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size;
- // The size output_batch would be if we didn't compress tuple_data (will be equal to
- // actual batch size if tuple_data isn't compressed)
-
+ int64_t bytes = RowBatch::GetSerializedSize(*dest);
+ int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 0515897..a3ed417 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -36,13 +36,12 @@ namespace impala {
AbortUnsupportedFeature();
}
-KrpcDataStreamMgr::~KrpcDataStreamMgr() {
-}
+KrpcDataStreamMgr::~KrpcDataStreamMgr(){}
-[[noreturn]] std::shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(RuntimeState* state,
- const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
- bool is_merging) {
+ [[noreturn]] std::shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+ RuntimeState* state, const RowDescriptor* row_desc,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+ int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
AbortUnsupportedFeature();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index adaebda..ef9bb45 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -38,8 +38,8 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
[[noreturn]] std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
- bool is_merging) override;
+ PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+ RuntimeProfile* profile, bool is_merging) override;
[[noreturn]] Status CloseSender(const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int sender_id) override;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 942ac05..beed671 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -340,8 +340,15 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
Reset();
}
-int RowBatch::GetBatchSize(const TRowBatch& batch) {
- int result = batch.tuple_data.size();
+int64_t RowBatch::GetDeserializedSize(const TRowBatch& batch) {
+ int64_t result = batch.uncompressed_size;
+ result += batch.row_tuples.size() * sizeof(TTupleId);
+ result += batch.tuple_offsets.size() * sizeof(int32_t);
+ return result;
+}
+
+int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) {
+ int64_t result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 35a8f14..49dd066 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -300,8 +300,11 @@ class RowBatch {
/// it is ignored. This function does not Reset().
Status Serialize(TRowBatch* output_batch);
- /// Utility function: returns total size of batch.
- static int GetBatchSize(const TRowBatch& batch);
+ /// Utility function: returns total byte size of a batch in either serialized or
+ /// deserialized form. If a row batch is compressed, its serialized size can be much
+ /// less than the deserialized size.
+ static int64_t GetSerializedSize(const TRowBatch& batch);
+ static int64_t GetDeserializedSize(const TRowBatch& batch);
int ALWAYS_INLINE num_rows() const { return num_rows_; }
int ALWAYS_INLINE capacity() const { return capacity_; }