You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:11 UTC

[24/33] incubator-impala git commit: IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0.

IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0.

IMPALA-4258: The problem was that there was a reference to
HdfsScanner::batch_ hidden inside WriteEmptyTuples(). The batch_
reference is NULL when the scanner is run with MT_DOP > 1.

IMPALA-4286: When there are no scan ranges HdfsScanNodeBase::Open()
exits early without initializing the reader context. This lead to
a DCHECK in IoMgr::GetNextRange() called from HdfsScanNodeMt.
The fix is to remove that unnecessary short-circuit Open().

I combined these two bugfixes because the new basic test covers
both cases.

Testing: Added a new test_mt_dop.py test. A private code/hdfs
run passed.

Change-Id: I79c0f6fd2aeb4bc6fa5f87219a485194fef2db1b
Reviewed-on: http://gerrit.cloudera.org:8080/4767
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff6b450a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff6b450a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff6b450a

Branch: refs/heads/hadoop-next
Commit: ff6b450ad380ce840e18875a89d9cf98058277a3
Parents: 51268c0
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Oct 19 23:27:14 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Oct 22 10:24:24 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-scanner.cc                |  2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  2 +-
 be/src/exec/hdfs-rcfile-scanner.cc              |  2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  2 -
 be/src/exec/hdfs-scanner.cc                     | 64 ++------------------
 be/src/exec/hdfs-scanner.h                      | 14 ++---
 be/src/exec/hdfs-sequence-scanner.cc            |  4 +-
 be/src/exec/hdfs-text-scanner.cc                |  2 +-
 .../queries/QueryTest/mt-dop.test               |  9 +++
 tests/query_test/test_mt_dop.py                 | 47 ++++++++++++++
 10 files changed, 73 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 88d6d3a..91a9d03 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -538,7 +538,7 @@ Status HdfsAvroScanner::ProcessRange() {
       int num_to_commit;
       if (scan_node_->materialized_slots().empty()) {
         // No slots to materialize (e.g. count(*)), no need to decode data
-        num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples);
+        num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
       } else {
         if (codegend_decode_avro_data_ != NULL) {
           num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e91a7ec..542f4cc 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -339,7 +339,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
     int max_tuples = min(row_batch->capacity(), rows_remaining);
     TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-    int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
     Status status = CommitRows(row_batch, num_to_commit);
     assemble_rows_timer_.Stop();
     RETURN_IF_ERROR(status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index f43b2aa..012a424 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -485,7 +485,7 @@ Status HdfsRCFileScanner::ProcessRange() {
         // If there are no materialized slots (e.g. count(*) or just partition cols)
         // we can shortcircuit the parse loop
         row_pos_ += max_tuples;
-        int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+        int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
         COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
         RETURN_IF_ERROR(CommitRows(num_to_commit));
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index cf6708c..957338d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -351,8 +351,6 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
 Status HdfsScanNodeBase::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
 
-  if (file_descs_.empty()) return Status::OK();
-
   // Open collection conjuncts
   for (const auto& entry: conjuncts_map_) {
     // conjuncts_ are already opened in ExecNode::Open()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0b6e8c5..3885522 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -211,66 +211,14 @@ Status HdfsScanner::CommitRows(int num_rows) {
   return Status::OK();
 }
 
-// In this code path, no slots were materialized from the input files.  The only
-// slots are from partition keys.  This lets us simplify writing out the batches.
-//   1. template_tuple_ is the complete tuple.
-//   2. Eval conjuncts against the tuple.
-//   3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
-  DCHECK_GT(num_tuples, 0);
-
-  if (template_tuple_ == NULL) {
-    // No slots from partitions keys or slots.  This is count(*).  Just add the
-    // number of rows to the batch.
-    row_batch->AddRows(num_tuples);
-    row_batch->CommitRows(num_tuples);
-  } else {
-    // Make a row and evaluate the row
-    int row_idx = row_batch->AddRow();
-
-    TupleRow* current_row = row_batch->GetRow(row_idx);
-    current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-    if (!EvalConjuncts(current_row)) return 0;
-    // Add first tuple
-    row_batch->CommitLastRow();
-    --num_tuples;
-
-    DCHECK_LE(num_tuples, row_batch->capacity() - row_batch->num_rows());
-
-    for (int n = 0; n < num_tuples; ++n) {
-      DCHECK(!row_batch->AtCapacity());
-      TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-      current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-      row_batch->CommitLastRow();
-    }
-  }
-  return num_tuples;
-}
-
-// In this code path, no slots were materialized from the input files.  The only
-// slots are from partition keys.  This lets us simplify writing out the batches.
-//   1. template_tuple_ is the complete tuple.
-//   2. Eval conjuncts against the tuple.
-//   3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(ScannerContext* context,
-    TupleRow* row, int num_tuples) {
+int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
   DCHECK_GE(num_tuples, 0);
-  if (num_tuples == 0) return 0;
-
-  if (template_tuple_ == NULL) {
-    // Must be conjuncts on constant exprs.
-    if (!EvalConjuncts(row)) return 0;
-    return num_tuples;
-  } else {
-    row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-    if (!EvalConjuncts(row)) return 0;
-    row = next_row(row);
+  DCHECK_EQ(scan_node_->tuple_idx(), 0);
+  DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0);
+  if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples;
 
-    for (int n = 1; n < num_tuples; ++n) {
-      row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-      row = next_row(row);
-    }
-  }
+  Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
+  for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_;
   return num_tuples;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 4a4d366..71efd5a 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -347,14 +347,9 @@ class HdfsScanner {
                                    scanner_conjunct_ctxs_->size(), row);
   }
 
-  /// Utility method to write out tuples when there are no materialized
-  /// fields (e.g. select count(*) or only partition keys).
-  ///   num_tuples - Total number of tuples to write out.
-  /// Returns the number of tuples added to the row batch.
-  int WriteEmptyTuples(RowBatch* row_batch, int num_tuples);
-
-  /// Write empty tuples and commit them to the context object
-  int WriteEmptyTuples(ScannerContext* context, TupleRow* tuple_row, int num_tuples);
+  /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the
+  /// 'tuple_row' only has a single tuple. Returns the number of tuples set.
+  int WriteTemplateTuples(TupleRow* row, int num_tuples);
 
   /// Processes batches of fields and writes them out to tuple_row_mem.
   /// - 'pool' mempool to allocate from for auxiliary tuple memory
@@ -455,9 +450,10 @@ class HdfsScanner {
     return reinterpret_cast<Tuple*>(mem + tuple_byte_size);
   }
 
+  /// Assumes the row only has a single tuple.
   inline TupleRow* next_row(TupleRow* r) const {
     uint8_t* mem = reinterpret_cast<uint8_t*>(r);
-    return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
+    return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*));
   }
 
   /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index fd552be..33be362 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -214,7 +214,7 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
 
   if (scan_node_->materialized_slots().empty()) {
     // Handle case where there are no slots to materialize (e.g. count(*))
-    num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process);
+    num_to_process = WriteTemplateTuples(tuple_row, num_to_process);
     COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
     RETURN_IF_ERROR(CommitRows(num_to_process));
     return Status::OK();
@@ -334,7 +334,7 @@ Status HdfsSequenceScanner::ProcessRange() {
         RETURN_IF_ERROR(parse_status_);
       }
     } else {
-      add_row = WriteEmptyTuples(context_, tuple_row_mem, 1);
+      add_row = WriteTemplateTuples(tuple_row_mem, 1);
     }
 
     COUNTER_ADD(scan_node_->rows_read_counter(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index cc63408..0b048f4 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -400,7 +400,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
       SCOPED_TIMER(scan_node_->materialize_tuple_timer());
       // If we are doing count(*) then we return tuples only containing partition keys
       boundary_row_.Clear();
-      num_tuples_materialized = WriteEmptyTuples(context_, tuple_row_mem, *num_tuples);
+      num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples);
     }
 
     // Save contents that are split across buffers if we are going to return this column

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
new file mode 100644
index 0000000..ac453ca
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+# IMPALA-4285: Test scan with no materialized slots.
+select count(*) from alltypes
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
new file mode 100644
index 0000000..1cd6d31
--- /dev/null
+++ b/tests/query_test/test_mt_dop.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests queries with the MT_DOP query option.
+
+import pytest
+
+from copy import deepcopy
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import TestDimension
+from tests.common.test_vector import TestVector
+
+MT_DOP_VALUES = [1, 2, 8]
+
+class TestMtDop(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDop, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(TestDimension('mt_dop', *MT_DOP_VALUES))
+    # IMPALA-4332: The MT scheduler does not work for Kudu or HBase tables.
+    cls.TestMatrix.add_constraint(\
+        lambda v: v.get_value('table_format').file_format != 'hbase')
+    cls.TestMatrix.add_constraint(\
+        lambda v: v.get_value('table_format').file_format != 'kudu')
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def test_mt_dop(self, vector):
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.run_test_case('QueryTest/mt-dop', new_vector)