You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/06 00:22:06 UTC

[1/2] incubator-impala git commit: IMPALA-5210: Count rows and collection items in parquet scanner separately

Repository: incubator-impala
Updated Branches:
  refs/heads/master 796db0fce -> 9ce691af2


IMPALA-5210: Count rows and collection items in parquet scanner separately

This patch adds collection_items_read_counter in scan node, makes
rows_read_counter count top-level rows only, and updates these counters
in a less frequent manner.
When scanning nested columns, current code counts both top-level rows
and nested rows in rows_read_counter, which is inconsistent with
rows_returned_counter. Furthermore, rows_read_counter is updated eagerly
whenever a batch of collection items are read. As a result it spends
around 10% time updating the counter with the following simple query:
>select count(*) from
> customer c,
> c.c_orders o,
> o.o_lineitems l
>where
> c_mktsegment = 'BUILDING'
> and o_orderdate < '1995-03-15'
> and l_shipdate > '1995-03-15' and o_orderkey = 10;

This patch moves collection items counting into
collection_items_read_counter. Both counters are updated for every row
batch read. In the query described above, scanning time is decreased by
10.4%.

Change-Id: I7f6efddaea18507482940f5bdab7326b6482b067
Reviewed-on: http://gerrit.cloudera.org:8080/7776
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/915d30f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/915d30f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/915d30f4

Branch: refs/heads/master
Commit: 915d30f4bb6334e9d816e080722484537732fe3d
Parents: 796db0f
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Aug 24 10:47:28 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 00:09:40 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 22 ++++++++++++----------
 be/src/exec/hdfs-parquet-scanner.h  |  9 ++++++++-
 be/src/exec/scan-node.cc            |  3 +++
 be/src/exec/scan-node.h             | 11 ++++++++++-
 4 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ecf14f8..03b9c70 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -171,6 +171,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     num_row_groups_counter_(NULL),
     num_scanners_with_no_reads_counter_(NULL),
     num_dict_filtered_row_groups_counter_(NULL),
+    coll_items_read_counter_(0),
     codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
@@ -958,6 +959,7 @@ Status HdfsParquetScanner::AssembleRows(
   DCHECK_EQ(*skip_row_group, false);
   DCHECK(scratch_batch_ != NULL);
 
+  int64_t num_rows_read = 0;
   while (!column_readers[0]->RowGroupAtEnd()) {
     // Start a new scratch batch.
     RETURN_IF_ERROR(scratch_batch_->Reset(state_));
@@ -965,10 +967,9 @@ Status HdfsParquetScanner::AssembleRows(
 
     // Materialize the top-level slots into the scratch batch column-by-column.
     int last_num_tuples = -1;
-    int num_col_readers = column_readers.size();
-    bool continue_execution = true;
-    for (int c = 0; c < num_col_readers; ++c) {
+    for (int c = 0; c < column_readers.size(); ++c) {
       ParquetColumnReader* col_reader = column_readers[c];
+      bool continue_execution;
       if (col_reader->max_rep_level() > 0) {
         continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
             scratch_batch_->capacity, tuple_byte_size_, scratch_batch_->tuple_mem,
@@ -997,14 +998,16 @@ Status HdfsParquetScanner::AssembleRows(
       }
       last_num_tuples = scratch_batch_->num_tuples;
     }
-    row_group_rows_read_ += scratch_batch_->num_tuples;
-    COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples);
-
+    num_rows_read += scratch_batch_->num_tuples;
     int num_row_to_commit = TransferScratchTuples(row_batch);
     RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
-    if (row_batch->AtCapacity()) return Status::OK();
+    if (row_batch->AtCapacity()) break;
   }
-
+  row_group_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  // Merge Scanner-local counter into HdfsScanNode counter and reset.
+  COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
+  coll_items_read_counter_ = 0;
   return Status::OK();
 }
 
@@ -1265,11 +1268,10 @@ bool HdfsParquetScanner::AssembleCollection(
     }
 
     rows_read += row_idx;
-    COUNTER_ADD(scan_node_->rows_read_counter(), row_idx);
     coll_value_builder->CommitTuples(num_to_commit);
     continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
   }
-
+  coll_items_read_counter_ += rows_read;
   if (end_of_collection) {
     // All column readers should report the start of the same collection.
     for (int c = 1; c < column_readers.size(); ++c) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index b31d321..5f67036 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -474,6 +474,12 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups skipped due to dictionary filter
   RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
 
+  /// Number of collection items read in current row batch. It is a scanner-local counter
+  /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
+  /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
+  /// AssembleRows() and then is reset to 0.
+  int64_t coll_items_read_counter_;
+
   typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
   ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
@@ -545,7 +551,8 @@ class HdfsParquetScanner : public HdfsScanner {
       WARN_UNUSED_RESULT;
 
   /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
-  /// allocated from 'coll_value_builder'.
+  /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the
+  /// number of items in this collection and descendant collections.
   ///
   /// 'new_collection_rep_level' indicates when the end of the collection has been
   /// reached, namely when current_rep_level <= new_collection_rep_level.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 938ce6e..0df0c3f 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -28,6 +28,7 @@ namespace impala {
 // Changing these names have compatibility concerns.
 const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
 const string ScanNode::ROWS_READ_COUNTER = "RowsRead";
+const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead";
 const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
 const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)";
 const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
@@ -58,6 +59,8 @@ Status ScanNode::Prepare(RuntimeState* state) {
       BYTES_READ_COUNTER, bytes_read_counter_);
   rows_read_counter_ =
       ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT);
+  collection_items_read_counter_ =
+      ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT);
   total_throughput_counter_ = runtime_profile()->AddRateCounter(
       TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
   materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 87b9c71..0f73c2b 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -97,6 +97,9 @@ class ScanNode : public ExecNode {
 
   RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; }
   RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; }
+  RuntimeProfile::Counter* collection_items_read_counter() const {
+    return collection_items_read_counter_;
+  }
   RuntimeProfile::Counter* read_timer() const { return read_timer_; }
   RuntimeProfile::Counter* total_throughput_counter() const {
     return total_throughput_counter_;
@@ -123,6 +126,7 @@ class ScanNode : public ExecNode {
   /// names of ScanNode common counters
   static const std::string BYTES_READ_COUNTER;
   static const std::string ROWS_READ_COUNTER;
+  static const std::string COLLECTION_ITEMS_READ_COUNTER;
   static const std::string TOTAL_HDFS_READ_TIMER;
   static const std::string TOTAL_HBASE_READ_TIMER;
   static const std::string TOTAL_THROUGHPUT_COUNTER;
@@ -143,8 +147,13 @@ class ScanNode : public ExecNode {
   RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the scanner
   /// Time series of the bytes_read_counter_
   RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_;
-  /// # rows/tuples read from the scanner (including those discarded by EvalConjucts())
+  /// # top-level rows/tuples read from the scanner
+  /// (including those discarded by EvalConjucts())
   RuntimeProfile::Counter* rows_read_counter_;
+  /// # items the scanner read into CollectionValues. For example, for schema
+  /// array<struct<B: INT, array<C: INT>> and tuple
+  /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
+  RuntimeProfile::Counter* collection_items_read_counter_;
   RuntimeProfile::Counter* read_timer_; // total read time
   /// Wall based aggregate read throughput [bytes/sec]
   RuntimeProfile::Counter* total_throughput_counter_;


[2/2] incubator-impala git commit: IMPALA-5885: free runtime filter allocations in Parquet

Posted by ta...@apache.org.
IMPALA-5885: free runtime filter allocations in Parquet

This fixes the parquet scanner to free local allocations in
runtime filter contexts for every batch.

Testing:
Added a regression test that runs out of memory before this fix.

Ran core and ASAN builds.

Change-Id: Iecdda6af12d5ca578f7d2cb393e9cb9f49438f09
Reviewed-on: http://gerrit.cloudera.org:8080/7931
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9ce691af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9ce691af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9ce691af

Branch: refs/heads/master
Commit: 9ce691af22d37d9e6914b6cc82dc241b9530e200
Parents: 915d30f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 31 15:44:28 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 00:10:45 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc               | 11 +++++++++--
 be/src/exec/hdfs-parquet-scanner.h                |  5 +++++
 be/src/exec/hdfs-scan-node.cc                     |  7 +------
 .../queries/QueryTest/runtime_row_filters.test    | 18 ++++++++++++++++++
 4 files changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 03b9c70..f20818a 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1029,11 +1029,18 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   // Store UDF error in thread local storage or make UDF return status so it can merge
   // with parse_status_.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations for this thread
+  // Free local expr allocations made when evaluating conjuncts for this batch.
+  FreeLocalAllocationsForConjuncts();
+  return Status::OK();
+}
+
+void HdfsParquetScanner::FreeLocalAllocationsForConjuncts() {
   for (const auto& kv: conjunct_evals_map_) {
     ScalarExprEvaluator::FreeLocalAllocations(kv.second);
   }
-  return Status::OK();
+  for (const FilterContext* filter_ctx : filter_ctxs_) {
+    filter_ctx->expr_eval->FreeLocalAllocations();
+  }
 }
 
 int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 5f67036..754737d 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -650,6 +650,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// no values that pass the relevant conjuncts, then the row group can be skipped.
   Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
       bool* skip_row_group) WARN_UNUSED_RESULT;
+
+  /// Free local allocations made when evaluating conjuncts over each row. Does not free
+  /// local allocations made when evaluated conjuncts for row groups, pages, etc. Those
+  /// should be freed separately after they are evaluated.
+  void FreeLocalAllocationsForConjuncts();
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 557d346..2dc8d7b 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -459,12 +459,7 @@ void HdfsScanNode::ScannerThread() {
 
 exit:
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
-  if (filter_status.ok()) {
-    for (auto& ctx: filter_ctxs) {
-      ctx.expr_eval->FreeLocalAllocations();
-      ctx.expr_eval->Close(runtime_state_);
-    }
-  }
+  for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 06414ee..01e1055 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -301,3 +301,21 @@ from alltypes a join [SHUFFLE] alltypessmall c
 ---- RESULTS
 108
 ====
+
+
+---- QUERY
+###################################################
+# Test case 14: filter with expression that uses local allocations.
+# IMPALA-5885: the parquet scanner should free local allocations from upper()/lower().
+# mem_limit is calibrated so that the query fails if allocations are not freed.
+###################################################
+
+SET RUNTIME_FILTER_WAIT_TIME_MS=100000;
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET MEM_LIMIT=200MB;
+select straight_join count(*)
+from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2
+    on lower(upper(lower(upper(lower(l1.l_comment))))) = concat(l2.l_comment, 'foo')
+---- RESULTS
+0
+====