You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/08 01:26:57 UTC

[3/3] impala git commit: IMPALA-8026: Fix #rows accounting for NLJ

IMPALA-8026: Fix #rows accounting for NLJ

Use the same, much simpler, approach used by all other ExecNodes.

Testing:
Manually tested by adding logging to print values of the counter.
Confirmed that the fix led to the counter having the correct
values (instead of values exceeding num_rows_returned_).

Change-Id: I43e2e1d8f85478ff36c8cddc69bac3bdccd2c824
Reviewed-on: http://gerrit.cloudera.org:8080/12164
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/78da2b13
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/78da2b13
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/78da2b13

Branch: refs/heads/master
Commit: 78da2b135143ac1faecd9ed1a62e1ce926555de3
Parents: e4cff7d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jan 4 08:48:32 2019 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jan 8 01:22:29 2019 +0000

----------------------------------------------------------------------
 be/src/exec/nested-loop-join-node.cc | 28 ++++++++--------------------
 1 file changed, 8 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/78da2b13/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index af97096..5eb5a13 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -246,8 +246,11 @@ Status NestedLoopJoinNode::GetNext(
 
 end:
   if (ReachedLimit()) {
-    output_batch->set_num_rows(
-        output_batch->num_rows() - (num_rows_returned_ - limit_));
+    int64_t extra_rows = num_rows_returned_ - limit_;
+    DCHECK_GE(extra_rows, 0);
+    DCHECK_LE(extra_rows, output_batch->num_rows());
+    output_batch->set_num_rows(output_batch->num_rows() - extra_rows);
+    num_rows_returned_ = limit_;
     eos_ = true;
   }
   if (eos_) {
@@ -255,6 +258,7 @@ end:
     probe_batch_->TransferResourceOwnership(output_batch);
     build_batches_->TransferResourceOwnership(output_batch);
   }
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 
@@ -320,7 +324,6 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
       ++num_rows_returned_;
       if (ReachedLimit()) {
         eos_ = true;
-        COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
         return Status::OK();
       }
       // Stop scanning the build rows for the current probe row. If we reach
@@ -330,7 +333,6 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
     RETURN_IF_ERROR(NextProbeRow(state, output_batch));
     if (output_batch->AtCapacity()) break;
   }
-  COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
   return Status::OK();
 }
 
@@ -405,7 +407,6 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
       if ((current_build_row_idx_ & (N - 1)) == 0) {
         if (ReachedLimit()) {
           eos_ = true;
-          COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
           return Status::OK();
         }
         RETURN_IF_CANCELLED(state);
@@ -435,15 +436,11 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
       VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
-      if (output_batch->AtCapacity()) {
-        COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
-        return Status::OK();
-      }
+      if (output_batch->AtCapacity()) return Status::OK();
     }
     RETURN_IF_ERROR(NextProbeRow(state, output_batch));
     if (output_batch->AtCapacity()) break;
   }
-  COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
   return Status::OK();
 }
 
@@ -523,7 +520,6 @@ Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
     VLOG_ROW << "match row:" << PrintRow(output_row, *row_desc());
     output_batch->CommitLastRow();
     ++num_rows_returned_;
-    COUNTER_ADD(rows_returned_counter_, 1);
     if (ReachedLimit()) eos_ = true;
   }
   return Status::OK();
@@ -549,7 +545,6 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
     if ((current_build_row_idx_ & (N - 1)) == 0) {
       if (ReachedLimit()) {
         eos_ = true;
-        COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
         return Status::OK();
       }
       RETURN_IF_CANCELLED(state);
@@ -578,14 +573,10 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
       VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
-      if (output_batch->AtCapacity()) {
-        COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
-        return Status::OK();
-      }
+      if (output_batch->AtCapacity()) return Status::OK();
     }
   }
   eos_ = true;
-  COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
   return Status::OK();
 }
 
@@ -613,7 +604,6 @@ Status NestedLoopJoinNode::FindBuildMatches(
       if (ReachedLimit()) {
         eos_ = true;
         *return_output_batch = true;
-        COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
         return Status::OK();
       }
       RETURN_IF_CANCELLED(state);
@@ -632,11 +622,9 @@ Status NestedLoopJoinNode::FindBuildMatches(
     ++num_rows_returned_;
     if (output_batch->AtCapacity()) {
       *return_output_batch = true;
-      COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
       return Status::OK();
     }
   }
-  COUNTER_ADD(rows_returned_counter_, output_batch->num_rows());
   return Status::OK();
 }