You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/06 00:22:06 UTC
[1/2] incubator-impala git commit: IMPALA-5210: Count rows and
collection items in parquet scanner separately
Repository: incubator-impala
Updated Branches:
refs/heads/master 796db0fce -> 9ce691af2
IMPALA-5210: Count rows and collection items in parquet scanner separately
This patch adds collection_items_read_counter in scan node, makes
rows_read_counter count top-level rows only, and updates these counters
in a less frequent manner.
When scanning nested columns, current code counts both top-level rows
and nested rows in rows_read_counter, which is inconsistent with
rows_returned_counter. Furthermore, rows_read_counter is updated eagerly
whenever a batch of collection items are read. As a result it spends
around 10% time updating the counter with the following simple query:
>select count(*) from
> customer c,
> c.c_orders o,
> o.o_lineitems l
>where
> c_mktsegment = 'BUILDING'
> and o_orderdate < '1995-03-15'
> and l_shipdate > '1995-03-15' and o_orderkey = 10;
This patch moves collection items counting into
collection_items_read_counter. Both counters are updated for every row
batch read. In the query described above, scanning time is decreased by
10.4%.
Change-Id: I7f6efddaea18507482940f5bdab7326b6482b067
Reviewed-on: http://gerrit.cloudera.org:8080/7776
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/915d30f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/915d30f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/915d30f4
Branch: refs/heads/master
Commit: 915d30f4bb6334e9d816e080722484537732fe3d
Parents: 796db0f
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Aug 24 10:47:28 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 00:09:40 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 22 ++++++++++++----------
be/src/exec/hdfs-parquet-scanner.h | 9 ++++++++-
be/src/exec/scan-node.cc | 3 +++
be/src/exec/scan-node.h | 11 ++++++++++-
4 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ecf14f8..03b9c70 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -171,6 +171,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
num_row_groups_counter_(NULL),
num_scanners_with_no_reads_counter_(NULL),
num_dict_filtered_row_groups_counter_(NULL),
+ coll_items_read_counter_(0),
codegend_process_scratch_batch_fn_(NULL) {
assemble_rows_timer_.Stop();
}
@@ -958,6 +959,7 @@ Status HdfsParquetScanner::AssembleRows(
DCHECK_EQ(*skip_row_group, false);
DCHECK(scratch_batch_ != NULL);
+ int64_t num_rows_read = 0;
while (!column_readers[0]->RowGroupAtEnd()) {
// Start a new scratch batch.
RETURN_IF_ERROR(scratch_batch_->Reset(state_));
@@ -965,10 +967,9 @@ Status HdfsParquetScanner::AssembleRows(
// Materialize the top-level slots into the scratch batch column-by-column.
int last_num_tuples = -1;
- int num_col_readers = column_readers.size();
- bool continue_execution = true;
- for (int c = 0; c < num_col_readers; ++c) {
+ for (int c = 0; c < column_readers.size(); ++c) {
ParquetColumnReader* col_reader = column_readers[c];
+ bool continue_execution;
if (col_reader->max_rep_level() > 0) {
continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
scratch_batch_->capacity, tuple_byte_size_, scratch_batch_->tuple_mem,
@@ -997,14 +998,16 @@ Status HdfsParquetScanner::AssembleRows(
}
last_num_tuples = scratch_batch_->num_tuples;
}
- row_group_rows_read_ += scratch_batch_->num_tuples;
- COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples);
-
+ num_rows_read += scratch_batch_->num_tuples;
int num_row_to_commit = TransferScratchTuples(row_batch);
RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
- if (row_batch->AtCapacity()) return Status::OK();
+ if (row_batch->AtCapacity()) break;
}
-
+ row_group_rows_read_ += num_rows_read;
+ COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+ // Merge Scanner-local counter into HdfsScanNode counter and reset.
+ COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
+ coll_items_read_counter_ = 0;
return Status::OK();
}
@@ -1265,11 +1268,10 @@ bool HdfsParquetScanner::AssembleCollection(
}
rows_read += row_idx;
- COUNTER_ADD(scan_node_->rows_read_counter(), row_idx);
coll_value_builder->CommitTuples(num_to_commit);
continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
}
-
+ coll_items_read_counter_ += rows_read;
if (end_of_collection) {
// All column readers should report the start of the same collection.
for (int c = 1; c < column_readers.size(); ++c) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index b31d321..5f67036 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -474,6 +474,12 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of row groups skipped due to dictionary filter
RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
+ /// Number of collection items read in current row batch. It is a scanner-local counter
+ /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
+ /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
+ /// AssembleRows() and then is reset to 0.
+ int64_t coll_items_read_counter_;
+
typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
/// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
@@ -545,7 +551,8 @@ class HdfsParquetScanner : public HdfsScanner {
WARN_UNUSED_RESULT;
/// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
- /// allocated from 'coll_value_builder'.
+ /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the
+ /// number of items in this collection and descendant collections.
///
/// 'new_collection_rep_level' indicates when the end of the collection has been
/// reached, namely when current_rep_level <= new_collection_rep_level.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 938ce6e..0df0c3f 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -28,6 +28,7 @@ namespace impala {
// Changing these names have compatibility concerns.
const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
const string ScanNode::ROWS_READ_COUNTER = "RowsRead";
+const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead";
const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)";
const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
@@ -58,6 +59,8 @@ Status ScanNode::Prepare(RuntimeState* state) {
BYTES_READ_COUNTER, bytes_read_counter_);
rows_read_counter_ =
ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT);
+ collection_items_read_counter_ =
+ ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT);
total_throughput_counter_ = runtime_profile()->AddRateCounter(
TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 87b9c71..0f73c2b 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -97,6 +97,9 @@ class ScanNode : public ExecNode {
RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; }
RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; }
+ RuntimeProfile::Counter* collection_items_read_counter() const {
+ return collection_items_read_counter_;
+ }
RuntimeProfile::Counter* read_timer() const { return read_timer_; }
RuntimeProfile::Counter* total_throughput_counter() const {
return total_throughput_counter_;
@@ -123,6 +126,7 @@ class ScanNode : public ExecNode {
/// names of ScanNode common counters
static const std::string BYTES_READ_COUNTER;
static const std::string ROWS_READ_COUNTER;
+ static const std::string COLLECTION_ITEMS_READ_COUNTER;
static const std::string TOTAL_HDFS_READ_TIMER;
static const std::string TOTAL_HBASE_READ_TIMER;
static const std::string TOTAL_THROUGHPUT_COUNTER;
@@ -143,8 +147,13 @@ class ScanNode : public ExecNode {
RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the scanner
/// Time series of the bytes_read_counter_
RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_;
- /// # rows/tuples read from the scanner (including those discarded by EvalConjucts())
+ /// # top-level rows/tuples read from the scanner
+ /// (including those discarded by EvalConjucts())
RuntimeProfile::Counter* rows_read_counter_;
+ /// # items the scanner read into CollectionValues. For example, for schema
+ /// array<struct<B: INT, array<C: INT>> and tuple
+ /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
+ RuntimeProfile::Counter* collection_items_read_counter_;
RuntimeProfile::Counter* read_timer_; // total read time
/// Wall based aggregate read throughput [bytes/sec]
RuntimeProfile::Counter* total_throughput_counter_;
[2/2] incubator-impala git commit: IMPALA-5885: free runtime filter
allocations in Parquet
Posted by ta...@apache.org.
IMPALA-5885: free runtime filter allocations in Parquet
This fixes the parquet scanner to free local allocations in
runtime filter contexts for every batch.
Testing:
Added a regression test that runs out of memory before this fix.
Ran core and ASAN builds.
Change-Id: Iecdda6af12d5ca578f7d2cb393e9cb9f49438f09
Reviewed-on: http://gerrit.cloudera.org:8080/7931
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9ce691af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9ce691af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9ce691af
Branch: refs/heads/master
Commit: 9ce691af22d37d9e6914b6cc82dc241b9530e200
Parents: 915d30f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 31 15:44:28 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 00:10:45 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 11 +++++++++--
be/src/exec/hdfs-parquet-scanner.h | 5 +++++
be/src/exec/hdfs-scan-node.cc | 7 +------
.../queries/QueryTest/runtime_row_filters.test | 18 ++++++++++++++++++
4 files changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 03b9c70..f20818a 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1029,11 +1029,18 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
// Store UDF error in thread local storage or make UDF return status so it can merge
// with parse_status_.
RETURN_IF_ERROR(state_->GetQueryStatus());
- // Free local expr allocations for this thread
+ // Free local expr allocations made when evaluating conjuncts for this batch.
+ FreeLocalAllocationsForConjuncts();
+ return Status::OK();
+}
+
+void HdfsParquetScanner::FreeLocalAllocationsForConjuncts() {
for (const auto& kv: conjunct_evals_map_) {
ScalarExprEvaluator::FreeLocalAllocations(kv.second);
}
- return Status::OK();
+ for (const FilterContext* filter_ctx : filter_ctxs_) {
+ filter_ctx->expr_eval->FreeLocalAllocations();
+ }
}
int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 5f67036..754737d 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -650,6 +650,11 @@ class HdfsParquetScanner : public HdfsScanner {
/// no values that pass the relevant conjuncts, then the row group can be skipped.
Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
bool* skip_row_group) WARN_UNUSED_RESULT;
+
+ /// Free local allocations made when evaluating conjuncts over each row. Does not free
+ /// local allocations made when evaluated conjuncts for row groups, pages, etc. Those
+ /// should be freed separately after they are evaluated.
+ void FreeLocalAllocationsForConjuncts();
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 557d346..2dc8d7b 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -459,12 +459,7 @@ void HdfsScanNode::ScannerThread() {
exit:
runtime_state_->resource_pool()->ReleaseThreadToken(false);
- if (filter_status.ok()) {
- for (auto& ctx: filter_ctxs) {
- ctx.expr_eval->FreeLocalAllocations();
- ctx.expr_eval->Close(runtime_state_);
- }
- }
+ for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
filter_mem_pool.FreeAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 06414ee..01e1055 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -301,3 +301,21 @@ from alltypes a join [SHUFFLE] alltypessmall c
---- RESULTS
108
====
+
+
+---- QUERY
+###################################################
+# Test case 14: filter with expression that uses local allocations.
+# IMPALA-5885: the parquet scanner should free local allocations from upper()/lower().
+# mem_limit is calibrated so that the query fails if allocations are not freed.
+###################################################
+
+SET RUNTIME_FILTER_WAIT_TIME_MS=100000;
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET MEM_LIMIT=200MB;
+select straight_join count(*)
+from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2
+ on lower(upper(lower(upper(lower(l1.l_comment))))) = concat(l2.l_comment, 'foo')
+---- RESULTS
+0
+====