You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/08 01:26:57 UTC
[3/3] impala git commit: IMPALA-8026: Fix #rows accounting for NLJ
IMPALA-8026: Fix #rows accounting for NLJ
Use the same, much simpler, approach used by all other ExecNodes.
Testing:
Manually tested by adding logging to print values of the counter.
Confirmed that the fix led to the counter having the correct
values (instead of values exceeding num_rows_returned_).
Change-Id: I43e2e1d8f85478ff36c8cddc69bac3bdccd2c824
Reviewed-on: http://gerrit.cloudera.org:8080/12164
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/78da2b13
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/78da2b13
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/78da2b13
Branch: refs/heads/master
Commit: 78da2b135143ac1faecd9ed1a62e1ce926555de3
Parents: e4cff7d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jan 4 08:48:32 2019 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jan 8 01:22:29 2019 +0000
----------------------------------------------------------------------
be/src/exec/nested-loop-join-node.cc | 28 ++++++++--------------------
1 file changed, 8 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/78da2b13/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index af97096..5eb5a13 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -246,8 +246,11 @@ Status NestedLoopJoinNode::GetNext(
end:
if (ReachedLimit()) {
- output_batch->set_num_rows(
- output_batch->num_rows() - (num_rows_returned_ - limit_));
+ int64_t extra_rows = num_rows_returned_ - limit_;
+ DCHECK_GE(extra_rows, 0);
+ DCHECK_LE(extra_rows, output_batch->num_rows());
+ output_batch->set_num_rows(output_batch->num_rows() - extra_rows);
+ num_rows_returned_ = limit_;
eos_ = true;
}
if (eos_) {
@@ -255,6 +258,7 @@ end:
probe_batch_->TransferResourceOwnership(output_batch);
build_batches_->TransferResourceOwnership(output_batch);
}
+ COUNTER_SET(rows_returned_counter_, num_rows_returned_);
return Status::OK();
}
@@ -320,7 +324,6 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
++num_rows_returned_;
if (ReachedLimit()) {
eos_ = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
// Stop scanning the build rows for the current probe row. If we reach
@@ -330,7 +333,6 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
@@ -405,7 +407,6 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
if ((current_build_row_idx_ & (N - 1)) == 0) {
if (ReachedLimit()) {
eos_ = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
RETURN_IF_CANCELLED(state);
@@ -435,15 +436,11 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
++num_rows_returned_;
- if (output_batch->AtCapacity()) {
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
- return Status::OK();
- }
+ if (output_batch->AtCapacity()) return Status::OK();
}
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
@@ -523,7 +520,6 @@ Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
VLOG_ROW << "match row:" << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
++num_rows_returned_;
- COUNTER_ADD(rows_returned_counter_, 1);
if (ReachedLimit()) eos_ = true;
}
return Status::OK();
@@ -549,7 +545,6 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
if ((current_build_row_idx_ & (N - 1)) == 0) {
if (ReachedLimit()) {
eos_ = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
RETURN_IF_CANCELLED(state);
@@ -578,14 +573,10 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
++num_rows_returned_;
- if (output_batch->AtCapacity()) {
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
- return Status::OK();
- }
+ if (output_batch->AtCapacity()) return Status::OK();
}
}
eos_ = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
@@ -613,7 +604,6 @@ Status NestedLoopJoinNode::FindBuildMatches(
if (ReachedLimit()) {
eos_ = true;
*return_output_batch = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
RETURN_IF_CANCELLED(state);
@@ -632,11 +622,9 @@ Status NestedLoopJoinNode::FindBuildMatches(
++num_rows_returned_;
if (output_batch->AtCapacity()) {
*return_output_batch = true;
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}
}
- COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
return Status::OK();
}