You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/10/25 01:18:02 UTC

[5/5] incubator-impala git commit: IMPALA-4236: Codegen CopyRows() for select nodes

IMPALA-4236: Codegen CopyRows() for select nodes

Testing:
Added test case to verify that CopyRows in select node is successfully
codegened.
Improved test coverage for select node with limit.

Performance:
Queries used (num_nodes set to 1):
500 Predicates: select * from (select * from tpch_parquet.lineitem
limit 6001215) t1 where l_partkey > 10 and l_extendedprice > 10000 and
l_linenumber > 1 and l_comment >'foo0' .... and l_comment >'foo500'
order by l_orderkey limit 10;

1 Predicate: select * from (select * from tpch_parquet.lineitem
limit 6001215) t1 where l_partkey > 10 order by l_orderkey limit 10;

+--------------+-----------------------------------------------------+
|              |      500 Predicates      |       1 Predicate        |
|              +------------+-------------+------------+-------------+
|              |   After    |   Before    |   After    |   Before    |
+--------------+------------+-------------+------------+-------------+
| Select Node  | 12s385ms   | 1m1s        | 234ms      | 797ms       |
| Codegen time | 2s619ms    | 1s962ms     | 200ms      | 181ms       |
+--------------+------------+-------------+------------+-------------+

Change-Id: Ie0d496d004418468e16b6f564f90f45ebbf87c1e
Reviewed-on: http://gerrit.cloudera.org:8080/8196
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ca55b592
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ca55b592
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ca55b592

Branch: refs/heads/master
Commit: ca55b5926ecfaf764f3f46add4391588e18b90f0
Parents: 98f8b19
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Sep 6 14:33:40 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Oct 25 01:17:28 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |   3 +-
 be/src/codegen/impala-ir.cc                     |   1 +
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/select-node-ir.cc                   |  41 ++++++++
 be/src/exec/select-node.cc                      | 102 +++++++++----------
 be/src/exec/select-node.h                       |  22 ++--
 .../queries/QueryTest/inline-view-limit.test    |  12 +++
 tests/query_test/test_codegen.py                |  11 ++
 8 files changed, 131 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 5e1ce43..5236f7b 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -206,7 +206,8 @@ ir_functions = [
   ["UNION_MATERIALIZE_BATCH",
   "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"],
   ["BLOOM_FILTER_INSERT_NO_AVX2", "_ZN6impala11BloomFilter12InsertNoAvx2Ej"],
-  ["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"]
+  ["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"],
+  ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"]
 ]
 
 enums_preamble = '\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 24e0ce7..bc3cfcb 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -33,6 +33,7 @@
 #include "exec/partitioned-aggregation-node-ir.cc"
 #include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"
+#include "exec/select-node-ir.cc"
 #include "exec/topn-node-ir.cc"
 #include "exec/union-node-ir.cc"
 #include "exprs/aggregate-functions-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index b5f7d3e..aab1383 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -85,6 +85,7 @@ add_library(Exec
   scan-node.cc
   scanner-context.cc
   select-node.cc
+  select-node-ir.cc
   singular-row-src-node.cc
   sort-node.cc
   subplan-node.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/exec/select-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node-ir.cc b/be/src/exec/select-node-ir.cc
new file mode 100644
index 0000000..35cea13
--- /dev/null
+++ b/be/src/exec/select-node-ir.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/select-node.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+void SelectNode::CopyRows(RowBatch* output_batch) {
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  int num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
+
+  FOREACH_ROW(child_row_batch_.get(), child_row_idx_, batch_iter) {
+    // Add a new row to output_batch
+    int dst_row_idx = output_batch->AddRow();
+    TupleRow* dst_row = output_batch->GetRow(dst_row_idx);
+    TupleRow* src_row = batch_iter.Get();
+    ++child_row_idx_;
+    if (EvalConjuncts(conjunct_evals, num_conjuncts, src_row)) {
+      output_batch->CopyRow(src_row, dst_row);
+      output_batch->CommitLastRow();
+      ++num_rows_returned_;
+      if (ReachedLimit() || output_batch->AtCapacity()) return;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/exec/select-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index 86aac1d..b82b3e3 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "exec/select-node.h"
+#include "codegen/llvm-codegen.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/row-batch.h"
@@ -26,6 +27,8 @@
 
 #include "common/names.h"
 
+using llvm::Function;
+
 namespace impala {
 
 SelectNode::SelectNode(
@@ -33,7 +36,8 @@ SelectNode::SelectNode(
     : ExecNode(pool, tnode, descs),
       child_row_batch_(NULL),
       child_row_idx_(0),
-      child_eos_(false) {
+      child_eos_(false),
+      codegend_copy_rows_fn_(nullptr) {
 }
 
 Status SelectNode::Prepare(RuntimeState* state) {
@@ -42,6 +46,35 @@ Status SelectNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
+void SelectNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+  SCOPED_TIMER(state->codegen()->codegen_timer());
+  Status codegen_status = CodegenCopyRows(state);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status SelectNode::CodegenCopyRows(RuntimeState* state) {
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+  Function* copy_rows_fn = codegen->GetFunction(IRFunction::SELECT_NODE_COPY_ROWS, true);
+  DCHECK(copy_rows_fn != nullptr);
+
+  Function* eval_conjuncts_fn;
+  RETURN_IF_ERROR(
+      ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn));
+
+  int replaced = codegen->ReplaceCallSites(copy_rows_fn, eval_conjuncts_fn,
+      "EvalConjuncts");
+  DCHECK_EQ(replaced, 1);
+  copy_rows_fn = codegen->FinalizeFunction(copy_rows_fn);
+  if (copy_rows_fn == nullptr) return Status("Failed to finalize CopyRows().");
+  codegen->AddFunctionToJit(copy_rows_fn,
+      reinterpret_cast<void**>(&codegend_copy_rows_fn_));
+  return Status::OK();
+}
+
 Status SelectNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
@@ -54,69 +87,32 @@ Status SelectNode::Open(RuntimeState* state) {
 Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
-
-  if (ReachedLimit() || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_)) {
-    // we're already done or we exhausted the last child batch and there won't be any
-    // new ones
-    *eos = true;
-    child_row_batch_->TransferResourceOwnership(row_batch);
-    return Status::OK();
-  }
-  *eos = false;
-
   // start (or continue) consuming row batches from child
-  while (true) {
+  do {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
-    if (child_row_idx_ == child_row_batch_->num_rows()) {
-      child_row_idx_ = 0;
-      // fetch next batch
-      child_row_batch_->TransferResourceOwnership(row_batch);
-      child_row_batch_->Reset();
-      if (row_batch->AtCapacity()) return Status::OK();
+    if (child_row_batch_->num_rows() == 0) {
+      // Fetch rows from child if either child row batch has been
+      // consumed completely or it is empty.
       RETURN_IF_ERROR(child(0)->GetNext(state, child_row_batch_.get(), &child_eos_));
     }
-
-    if (CopyRows(row_batch)) {
-      *eos = ReachedLimit()
-          || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_);
-      if (*eos) child_row_batch_->TransferResourceOwnership(row_batch);
-      return Status::OK();
+    if (codegend_copy_rows_fn_ != nullptr) {
+      codegend_copy_rows_fn_(this, row_batch);
+    } else {
+      CopyRows(row_batch);
     }
-    if (child_eos_) {
-      // finished w/ last child row batch, and child eos is true
+    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    *eos = ReachedLimit()
+        || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_);
+    if (*eos || child_row_idx_ == child_row_batch_->num_rows()) {
+      child_row_idx_ = 0;
       child_row_batch_->TransferResourceOwnership(row_batch);
-      *eos = true;
-      return Status::OK();
+      child_row_batch_->Reset();
     }
-  }
+  } while (!*eos && !row_batch->AtCapacity());
   return Status::OK();
 }
 
-bool SelectNode::CopyRows(RowBatch* output_batch) {
-  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
-  int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  while (child_row_idx_ < child_row_batch_->num_rows()) {
-    // Add a new row to output_batch
-    int dst_row_idx = output_batch->AddRow();
-    TupleRow* dst_row = output_batch->GetRow(dst_row_idx);
-    TupleRow* src_row = child_row_batch_->GetRow(child_row_idx_);
-    // Make sure to increment row idx before returning.
-    ++child_row_idx_;
-
-    if (EvalConjuncts(conjunct_evals, num_conjuncts, src_row)) {
-      output_batch->CopyRow(src_row, dst_row);
-      output_batch->CommitLastRow();
-      ++num_rows_returned_;
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-      if (ReachedLimit() || output_batch->AtCapacity()) return true;
-    }
-  }
-  return output_batch->AtCapacity();
-}
-
 Status SelectNode::Reset(RuntimeState* state) {
   child_row_batch_->Reset();
   child_row_idx_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/be/src/exec/select-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.h b/be/src/exec/select-node.h
index 3f8749a..ce85cfb 100644
--- a/be/src/exec/select-node.h
+++ b/be/src/exec/select-node.h
@@ -36,11 +36,12 @@ class SelectNode : public ExecNode {
  public:
   SelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
 
-  virtual Status Prepare(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
 
  private:
   /////////////////////////////////////////
@@ -58,10 +59,15 @@ class SelectNode : public ExecNode {
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
+  typedef void (*CopyRowsFn)(SelectNode*, RowBatch*);
+  CopyRowsFn codegend_copy_rows_fn_;
+
   /// Copy rows from child_row_batch_ for which conjuncts_ evaluate to true to
-  /// output_batch, up to limit_.
-  /// Return true if limit was hit or output_batch should be returned, otherwise false.
-  bool CopyRows(RowBatch* output_batch);
+  /// output_batch, up to limit_ or till the output row batch reaches capacity.
+  void CopyRows(RowBatch* output_batch);
+
+  /// Codegen CopyRows(). Used for mostly codegen'ing the conjuncts evaluation logic.
+  Status CodegenCopyRows(RuntimeState* state);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/testdata/workloads/functional-query/queries/QueryTest/inline-view-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/inline-view-limit.test b/testdata/workloads/functional-query/queries/QueryTest/inline-view-limit.test
index 38e81b0..93a7a58 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/inline-view-limit.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/inline-view-limit.test
@@ -91,3 +91,15 @@ select count(*) from (
 ---- TYPES
 bigint
 ====
+---- QUERY
+# Test that limit is enforced by select node.
+select * from (select * from alltypes order by id limit 20) t1 where int_col <100 limit 5
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
+3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1
+4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ca55b592/tests/query_test/test_codegen.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py
index 8a4390a..ccd85e1 100644
--- a/tests/query_test/test_codegen.py
+++ b/tests/query_test/test_codegen.py
@@ -19,6 +19,8 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_exec_option_dimension_from_dict
+from tests.common.test_result_verifier import get_node_exec_options,\
+    assert_codegen_enabled
 
 class TestCodegen(ImpalaTestSuite):
   @classmethod
@@ -39,3 +41,12 @@ class TestCodegen(ImpalaTestSuite):
   def test_disable_codegen(self, vector):
     """Test that codegen is enabled/disabled by the planner as expected."""
     self.run_test_case('QueryTest/disable-codegen', vector)
+
+  def test_select_node_codegen(self, vector):
+    """Test that select node is codegened"""
+    result = self.execute_query('select * from (select * from functional.alltypes '
+        'limit 1000000) t1 where int_col > 10 limit 10')
+    exec_options = get_node_exec_options(result.runtime_profile, 1)
+    # Make sure test fails if there are no exec options in the profile for the node
+    assert len(exec_options) > 0
+    assert_codegen_enabled(result.runtime_profile, [1])
\ No newline at end of file