You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/22 17:58:48 UTC
[3/4] impala git commit: IMPALA-6461 : Micro-optimizations to
DataStreamSender::Send
IMPALA-6461 : Micro-optimizations to DataStreamSender::Send
While analyzing performance of partition exchange operator,
I noticed that there is dependency and a function call per row in the hot path.
Optimizations in this change are:
1) Remove the data dependency between computing the hash and the channel
2) Inline DataStreamSender::Channel::AddRow
3) Save partition_exprs_.size() to save a couple of instructions
This translates to improving CPI for DataStreamSender::Send by 10%
Change-Id: I642a9dad531a29d4838a3537ab0e04320a69960d
Reviewed-on: http://gerrit.cloudera.org:8080/9221
Reviewed-by: Mostafa Mokhtar <mm...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bf426527
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bf426527
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bf426527
Branch: refs/heads/2.x
Commit: bf42652748f2dad2fd343574ba205eb88089549c
Parents: 9d7b210
Author: mmokhtar <mm...@cloudera.com>
Authored: Mon Feb 5 17:05:02 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 22 02:52:03 2018 +0000
----------------------------------------------------------------------
be/src/runtime/data-stream-sender.cc | 77 ++++++++++++++++++--------
be/src/runtime/krpc-data-stream-sender.cc | 77 ++++++++++++++++++--------
be/src/runtime/row-batch.h | 4 ++
3 files changed, 110 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index c572467..30bf5b6 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -87,7 +87,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
// Copies a single row into this channel's output buffer and flushes buffer
// if it reaches capacity.
// Returns error status if any of the preceding rpcs failed, OK otherwise.
- Status AddRow(TupleRow* row) WARN_UNUSED_RESULT;
+ Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
// Asynchronously sends a row batch.
// Returns the status of the most recently finished TransmitData
@@ -243,7 +243,7 @@ void DataStreamSender::Channel::WaitForRpc() {
}
}
-Status DataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
if (batch_->AtCapacity()) {
// batch_ is full, let's send it; but first wait for an ongoing
// transmission to finish before modifying thrift_batch_
@@ -443,16 +443,29 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
} else if (partition_type_ == TPartitionType::KUDU) {
DCHECK_EQ(partition_expr_evals_.size(), 1);
int num_channels = channels_.size();
- for (int i = 0; i < batch->num_rows(); ++i) {
- TupleRow* row = batch->GetRow(i);
- int32_t partition =
- *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
- if (partition < 0) {
- // This row doesn't coorespond to a partition, e.g. it's outside the given ranges.
- partition = next_unknown_partition_;
- ++next_unknown_partition_;
+ const int num_rows = batch->num_rows();
+ const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+ int channel_ids[hash_batch_size];
+
+ for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+ const int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ int32_t partition =
+ *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+ if (partition < 0) {
+ // This row doesn't correspond to a partition,
+ // e.g. it's outside the given ranges.
+ partition = next_unknown_partition_;
+ ++next_unknown_partition_;
+ }
+ channel_ids[i] = partition % num_channels;
+ }
+
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
}
- RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
}
} else {
DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
@@ -460,20 +473,36 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
// TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
// once we have codegen here.
int num_channels = channels_.size();
- for (int i = 0; i < batch->num_rows(); ++i) {
- TupleRow* row = batch->GetRow(i);
- uint64_t hash_val = EXCHANGE_HASH_SEED;
- for (int j = 0; j < partition_exprs_.size(); ++j) {
- ScalarExprEvaluator* eval = partition_expr_evals_[j];
- void* partition_val = eval->GetValue(row);
- // We can't use the crc hash function here because it does not result in
- // uncorrelated hashes with different seeds. Instead we use FastHash.
- // TODO: fix crc hash/GetHashValue()
- DCHECK(&(eval->root()) == partition_exprs_[j]);
- hash_val = RawValue::GetHashValueFastHash(
- partition_val, partition_exprs_[j]->type(), hash_val);
+ const int num_partition_exprs = partition_exprs_.size();
+ const int num_rows = batch->num_rows();
+ const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+ int channel_ids[hash_batch_size];
+
+ // Break the loop into two parts break the data dependency between computing
+ // the hash and calling AddRow()
+ // To keep stack allocation small a RowBatch::HASH_BATCH is used
+ for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+ int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ uint64_t hash_val = EXCHANGE_HASH_SEED;
+ for (int j = 0; j < num_partition_exprs; ++j) {
+ ScalarExprEvaluator* eval = partition_expr_evals_[j];
+ void* partition_val = eval->GetValue(row);
+ // We can't use the crc hash function here because it does not result in
+ // uncorrelated hashes with different seeds. Instead we use FastHash.
+ // TODO: fix crc hash/GetHashValue()
+ DCHECK(&(eval->root()) == partition_exprs_[j]);
+ hash_val = RawValue::GetHashValueFastHash(
+ partition_val, partition_exprs_[j]->type(), hash_val);
+ }
+ channel_ids[i] = hash_val % num_channels;
+ }
+
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
}
- RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
}
}
COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4866e4e..6c0ad01 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -129,7 +129,7 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
// it reaches capacity. This call may block if the row batch's capacity is reached
// and the preceding RPC is still in progress. Returns error status if serialization
// failed or if the preceding RPC failed. Return OK otherwise.
- Status AddRow(TupleRow* row);
+ Status ALWAYS_INLINE AddRow(TupleRow* row);
// Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
// be cancelled. It's expected that clients normally call FlushAndSendEos() before
@@ -474,7 +474,7 @@ Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
return Status::OK();
}
-Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
if (batch_->AtCapacity()) {
// batch_ is full, let's send it.
RETURN_IF_ERROR(SendCurrentBatch());
@@ -660,16 +660,29 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
} else if (partition_type_ == TPartitionType::KUDU) {
DCHECK_EQ(partition_expr_evals_.size(), 1);
int num_channels = channels_.size();
- for (int i = 0; i < batch->num_rows(); ++i) {
- TupleRow* row = batch->GetRow(i);
- int32_t partition =
- *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
- if (partition < 0) {
- // This row doesn't correspond to a partition, e.g. it's outside the given ranges.
- partition = next_unknown_partition_;
- ++next_unknown_partition_;
+ const int num_rows = batch->num_rows();
+ const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+ int channel_ids[hash_batch_size];
+
+ for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+ int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ int32_t partition =
+ *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+ if (partition < 0) {
+ // This row doesn't correspond to a partition,
+ // e.g. it's outside the given ranges.
+ partition = next_unknown_partition_;
+ ++next_unknown_partition_;
+ }
+ channel_ids[i] = partition % num_channels;
+ }
+
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
}
- RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
}
} else {
DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
@@ -677,20 +690,36 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
// TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
// once we have codegen here.
int num_channels = channels_.size();
- for (int i = 0; i < batch->num_rows(); ++i) {
- TupleRow* row = batch->GetRow(i);
- uint64_t hash_val = EXCHANGE_HASH_SEED;
- for (int j = 0; j < partition_exprs_.size(); ++j) {
- ScalarExprEvaluator* eval = partition_expr_evals_[j];
- void* partition_val = eval->GetValue(row);
- // We can't use the crc hash function here because it does not result in
- // uncorrelated hashes with different seeds. Instead we use FastHash.
- // TODO: fix crc hash/GetHashValue()
- DCHECK(&(eval->root()) == partition_exprs_[j]);
- hash_val = RawValue::GetHashValueFastHash(
- partition_val, partition_exprs_[j]->type(), hash_val);
+ const int num_partition_exprs = partition_exprs_.size();
+ const int num_rows = batch->num_rows();
+ const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+ int channel_ids[hash_batch_size];
+
+ // Break the loop into two parts break the data dependency between computing
+ // the hash and calling AddRow()
+ // To keep stack allocation small a RowBatch::HASH_BATCH is used
+ for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+ int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ uint64_t hash_val = EXCHANGE_HASH_SEED;
+ for (int j = 0; j < num_partition_exprs; ++j) {
+ ScalarExprEvaluator* eval = partition_expr_evals_[j];
+ void* partition_val = eval->GetValue(row);
+ // We can't use the crc hash function here because it does not result in
+ // uncorrelated hashes with different seeds. Instead we use FastHash.
+ // TODO: fix crc hash/GetHashValue()
+ DCHECK(&(eval->root()) == partition_exprs_[j]);
+ hash_val = RawValue::GetHashValueFastHash(
+ partition_val, partition_exprs_[j]->type(), hash_val);
+ }
+ channel_ids[i] = hash_val % num_channels;
+ }
+
+ for (int i = 0; i < batch_window_size; ++i) {
+ TupleRow* row = batch->GetRow(i + batch_start);
+ RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
}
- RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
}
}
COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aad5ebe..3bde4d1 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -385,6 +385,10 @@ class RowBatch {
// in order to leave room for variable-length data.
static const int FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
+ // Batch size to compute hash, keep it small to avoid large stack allocations.
+ // 16 provided the same speedup compared to operating over a full batch.
+ static const int HASH_BATCH_SIZE = 16;
+
/// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
/// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
/// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.