You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:11 UTC
[24/33] incubator-impala git commit: IMPALA-4285/IMPALA-4286: Fixes
for Parquet scanner with MT_DOP > 0.
IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0.
IMPALA-4258: The problem was that there was a reference to
HdfsScanner::batch_ hidden inside WriteEmptyTuples(). The batch_
reference is NULL when the scanner is run with MT_DOP > 1.
IMPALA-4286: When there are no scan ranges HdfsScanNodeBase::Open()
exits early without initializing the reader context. This lead to
a DCHECK in IoMgr::GetNextRange() called from HdfsScanNodeMt.
The fix is to remove that unnecessary short-circuit Open().
I combined these two bugfixes because the new basic test covers
both cases.
Testing: Added a new test_mt_dop.py test. A private code/hdfs
run passed.
Change-Id: I79c0f6fd2aeb4bc6fa5f87219a485194fef2db1b
Reviewed-on: http://gerrit.cloudera.org:8080/4767
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff6b450a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff6b450a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff6b450a
Branch: refs/heads/hadoop-next
Commit: ff6b450ad380ce840e18875a89d9cf98058277a3
Parents: 51268c0
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Oct 19 23:27:14 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Oct 22 10:24:24 2016 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-avro-scanner.cc | 2 +-
be/src/exec/hdfs-parquet-scanner.cc | 2 +-
be/src/exec/hdfs-rcfile-scanner.cc | 2 +-
be/src/exec/hdfs-scan-node-base.cc | 2 -
be/src/exec/hdfs-scanner.cc | 64 ++------------------
be/src/exec/hdfs-scanner.h | 14 ++---
be/src/exec/hdfs-sequence-scanner.cc | 4 +-
be/src/exec/hdfs-text-scanner.cc | 2 +-
.../queries/QueryTest/mt-dop.test | 9 +++
tests/query_test/test_mt_dop.py | 47 ++++++++++++++
10 files changed, 73 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 88d6d3a..91a9d03 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -538,7 +538,7 @@ Status HdfsAvroScanner::ProcessRange() {
int num_to_commit;
if (scan_node_->materialized_slots().empty()) {
// No slots to materialize (e.g. count(*)), no need to decode data
- num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples);
+ num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
} else {
if (codegend_decode_avro_data_ != NULL) {
num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e91a7ec..542f4cc 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -339,7 +339,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
int max_tuples = min(row_batch->capacity(), rows_remaining);
TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
- int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+ int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
Status status = CommitRows(row_batch, num_to_commit);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index f43b2aa..012a424 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -485,7 +485,7 @@ Status HdfsRCFileScanner::ProcessRange() {
// If there are no materialized slots (e.g. count(*) or just partition cols)
// we can shortcircuit the parse loop
row_pos_ += max_tuples;
- int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+ int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
RETURN_IF_ERROR(CommitRows(num_to_commit));
continue;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index cf6708c..957338d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -351,8 +351,6 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
Status HdfsScanNodeBase::Open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Open(state));
- if (file_descs_.empty()) return Status::OK();
-
// Open collection conjuncts
for (const auto& entry: conjuncts_map_) {
// conjuncts_ are already opened in ExecNode::Open()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0b6e8c5..3885522 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -211,66 +211,14 @@ Status HdfsScanner::CommitRows(int num_rows) {
return Status::OK();
}
-// In this code path, no slots were materialized from the input files. The only
-// slots are from partition keys. This lets us simplify writing out the batches.
-// 1. template_tuple_ is the complete tuple.
-// 2. Eval conjuncts against the tuple.
-// 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
- DCHECK_GT(num_tuples, 0);
-
- if (template_tuple_ == NULL) {
- // No slots from partitions keys or slots. This is count(*). Just add the
- // number of rows to the batch.
- row_batch->AddRows(num_tuples);
- row_batch->CommitRows(num_tuples);
- } else {
- // Make a row and evaluate the row
- int row_idx = row_batch->AddRow();
-
- TupleRow* current_row = row_batch->GetRow(row_idx);
- current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
- if (!EvalConjuncts(current_row)) return 0;
- // Add first tuple
- row_batch->CommitLastRow();
- --num_tuples;
-
- DCHECK_LE(num_tuples, row_batch->capacity() - row_batch->num_rows());
-
- for (int n = 0; n < num_tuples; ++n) {
- DCHECK(!row_batch->AtCapacity());
- TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
- current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
- row_batch->CommitLastRow();
- }
- }
- return num_tuples;
-}
-
-// In this code path, no slots were materialized from the input files. The only
-// slots are from partition keys. This lets us simplify writing out the batches.
-// 1. template_tuple_ is the complete tuple.
-// 2. Eval conjuncts against the tuple.
-// 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(ScannerContext* context,
- TupleRow* row, int num_tuples) {
+int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
DCHECK_GE(num_tuples, 0);
- if (num_tuples == 0) return 0;
-
- if (template_tuple_ == NULL) {
- // Must be conjuncts on constant exprs.
- if (!EvalConjuncts(row)) return 0;
- return num_tuples;
- } else {
- row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
- if (!EvalConjuncts(row)) return 0;
- row = next_row(row);
+ DCHECK_EQ(scan_node_->tuple_idx(), 0);
+ DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0);
+ if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples;
- for (int n = 1; n < num_tuples; ++n) {
- row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
- row = next_row(row);
- }
- }
+ Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
+ for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_;
return num_tuples;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 4a4d366..71efd5a 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -347,14 +347,9 @@ class HdfsScanner {
scanner_conjunct_ctxs_->size(), row);
}
- /// Utility method to write out tuples when there are no materialized
- /// fields (e.g. select count(*) or only partition keys).
- /// num_tuples - Total number of tuples to write out.
- /// Returns the number of tuples added to the row batch.
- int WriteEmptyTuples(RowBatch* row_batch, int num_tuples);
-
- /// Write empty tuples and commit them to the context object
- int WriteEmptyTuples(ScannerContext* context, TupleRow* tuple_row, int num_tuples);
+ /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the
+ /// 'tuple_row' only has a single tuple. Returns the number of tuples set.
+ int WriteTemplateTuples(TupleRow* row, int num_tuples);
/// Processes batches of fields and writes them out to tuple_row_mem.
/// - 'pool' mempool to allocate from for auxiliary tuple memory
@@ -455,9 +450,10 @@ class HdfsScanner {
return reinterpret_cast<Tuple*>(mem + tuple_byte_size);
}
+ /// Assumes the row only has a single tuple.
inline TupleRow* next_row(TupleRow* r) const {
uint8_t* mem = reinterpret_cast<uint8_t*>(r);
- return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
+ return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*));
}
/// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index fd552be..33be362 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -214,7 +214,7 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
if (scan_node_->materialized_slots().empty()) {
// Handle case where there are no slots to materialize (e.g. count(*))
- num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process);
+ num_to_process = WriteTemplateTuples(tuple_row, num_to_process);
COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
RETURN_IF_ERROR(CommitRows(num_to_process));
return Status::OK();
@@ -334,7 +334,7 @@ Status HdfsSequenceScanner::ProcessRange() {
RETURN_IF_ERROR(parse_status_);
}
} else {
- add_row = WriteEmptyTuples(context_, tuple_row_mem, 1);
+ add_row = WriteTemplateTuples(tuple_row_mem, 1);
}
COUNTER_ADD(scan_node_->rows_read_counter(), 1);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index cc63408..0b048f4 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -400,7 +400,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
// If we are doing count(*) then we return tuples only containing partition keys
boundary_row_.Clear();
- num_tuples_materialized = WriteEmptyTuples(context_, tuple_row_mem, *num_tuples);
+ num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples);
}
// Save contents that are split across buffers if we are going to return this column
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
new file mode 100644
index 0000000..ac453ca
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+# IMPALA-4285: Test scan with no materialized slots.
+select count(*) from alltypes
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
new file mode 100644
index 0000000..1cd6d31
--- /dev/null
+++ b/tests/query_test/test_mt_dop.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests queries with the MT_DOP query option.
+
+import pytest
+
+from copy import deepcopy
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import TestDimension
+from tests.common.test_vector import TestVector
+
+MT_DOP_VALUES = [1, 2, 8]
+
+class TestMtDop(ImpalaTestSuite):
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestMtDop, cls).add_test_dimensions()
+ cls.TestMatrix.add_dimension(TestDimension('mt_dop', *MT_DOP_VALUES))
+ # IMPALA-4332: The MT scheduler does not work for Kudu or HBase tables.
+ cls.TestMatrix.add_constraint(\
+ lambda v: v.get_value('table_format').file_format != 'hbase')
+ cls.TestMatrix.add_constraint(\
+ lambda v: v.get_value('table_format').file_format != 'kudu')
+
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ def test_mt_dop(self, vector):
+ new_vector = deepcopy(vector)
+ new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+ self.run_test_case('QueryTest/mt-dop', new_vector)