You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/21 17:38:22 UTC

[2/2] incubator-impala git commit: IMPALA-4863/IMPALA-5311: Correctly account the file type and compression codec

IMPALA-4863/IMPALA-5311: Correctly account the file type and compression codec

If a scan range is skipped at runtime the scan node skips reading
the range and never figures out the underlying compression codec used
to compress the files. In such a scenario we default the compression
codec to NONE which can be misleading. This change marks these files
as filtered in the scan node profile

e.g. - File Formats: TEXT/NONE:364 TEXT/NONE(Skipped):1460

Change-Id: I797916505f62e568f4159e07099481b8ff571da2
Reviewed-on: http://gerrit.cloudera.org:8080/7245
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f87da848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f87da848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f87da848

Branch: refs/heads/master
Commit: f87da848f5f204ae0dc84ffd9de64007e197c4d9
Parents: fa93a47
Author: aphadke <ap...@cloudera.com>
Authored: Mon Jun 19 16:34:57 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Sep 21 17:38:08 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  9 ++++--
 be/src/exec/hdfs-scan-node-base.cc              | 28 ++++++++++++++----
 be/src/exec/hdfs-scan-node-base.h               | 12 ++++++--
 be/src/exec/hdfs-scan-node.cc                   |  4 +--
 be/src/exec/hdfs-scan-node.h                    |  2 +-
 .../queries/QueryTest/hdfs_scanner_profile.test | 30 ++++++++++++++++++--
 tests/query_test/test_scanners.py               |  6 ++--
 7 files changed, 71 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 4cd4340..57f1e24 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -315,9 +315,12 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   assemble_rows_timer_.ReleaseCounter();
 
   // If this was a metadata only read (i.e. count(*)), there are no columns.
-  if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE);
-  scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
-
+  if (compression_types.empty()) {
+    compression_types.push_back(THdfsCompression::NONE);
+    scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types, true);
+  } else {
+    scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
+  }
   if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
 
   ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b5169a8..e74efcd 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -553,7 +553,7 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
           filter_ctxs)) {
     for (int j = 0; j < file->splits.size(); ++j) {
       // Mark range as complete to ensure progress.
-      RangeComplete(format, file->file_compression);
+      RangeComplete(format, file->file_compression, true);
     }
     return false;
   }
@@ -775,18 +775,18 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
 }
 
 void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
-    const THdfsCompression::type& compression_type) {
+    const THdfsCompression::type& compression_type, bool skipped) {
   vector<THdfsCompression::type> types;
   types.push_back(compression_type);
-  RangeComplete(file_type, types);
+  RangeComplete(file_type, types, skipped);
 }
 
 void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
-    const vector<THdfsCompression::type>& compression_types) {
+    const vector<THdfsCompression::type>& compression_types, bool skipped) {
   scan_ranges_complete_counter()->Add(1);
   progress_.Update(1);
   for (int i = 0; i < compression_types.size(); ++i) {
-    ++file_type_counts_[make_pair(file_type, compression_types[i])];
+    ++file_type_counts_[std::make_tuple(file_type, skipped, compression_types[i])];
   }
 }
 
@@ -871,7 +871,23 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
     {
       for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
           it != file_type_counts_.end(); ++it) {
-        ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
+
+        THdfsFileFormat::type file_format = std::get<0>(it->first);
+        bool skipped = std::get<1>(it->first);
+        THdfsCompression::type compression_type = std::get<2>(it->first);
+
+        if (skipped) {
+          if (file_format == THdfsFileFormat::PARQUET) {
+            // If a scan range stored as parquet is skipped, its compression type
+            // cannot be figured out without reading the data.
+            ss << file_format << "/" << "Unknown" << "(Skipped):" << it->second << " ";
+          } else {
+            ss << file_format << "/" << compression_type << "(Skipped):"
+               << it->second << " ";
+          }
+        } else {
+          ss << file_format << "/" << compression_type << ":" << it->second << " ";
+        }
       }
     }
     runtime_profile_->AddInfoString("File Formats", ss.str());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e33de5a..7e9d322 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -23,6 +23,7 @@
 #include <memory>
 #include <unordered_set>
 #include <vector>
+#include <tuple>
 
 #include <boost/unordered_map.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -252,11 +253,15 @@ class HdfsScanNodeBase : public ScanNode {
   /// Otherwise, scan nodes using a RowBatch queue may lose the last batch due
   /// to racing with shutting down the queue.
   void RangeComplete(const THdfsFileFormat::type& file_type,
-      const THdfsCompression::type& compression_type);
+      const THdfsCompression::type& compression_type, bool skipped = false);
+
   /// Same as above except for when multiple compression codecs were used
   /// in the file. The metrics are incremented for each compression_type.
+  /// 'skipped' is set to true in the following cases -
+  /// 1. when a scan range is filtered at runtime
+  /// 2. scan range is a metadata read only(e.x. count(*) on parquet files)
   virtual void RangeComplete(const THdfsFileFormat::type& file_type,
-      const std::vector<THdfsCompression::type>& compression_type);
+      const std::vector<THdfsCompression::type>& compression_type, bool skipped = false);
 
   /// Utility function to compute the order in which to materialize slots to allow for
   /// computing conjuncts as slots get materialized (on partial tuples).
@@ -492,7 +497,8 @@ class HdfsScanNodeBase : public ScanNode {
   /// Mapping of file formats (file type, compression type) to the number of
   /// splits of that type and the lock protecting it.
   typedef std::map<
-      std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
+     std::tuple<THdfsFileFormat::type, bool, THdfsCompression::type>,
+     int> FileTypeCountsMap;
   FileTypeCountsMap file_type_counts_;
 
   /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 528e290..64eece3 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -242,9 +242,9 @@ void HdfsScanNode::Close(RuntimeState* state) {
 }
 
 void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
-    const std::vector<THdfsCompression::type>& compression_type) {
+    const std::vector<THdfsCompression::type>& compression_type, bool skipped) {
   lock_guard<SpinLock> l(file_type_counts_);
-  HdfsScanNodeBase::RangeComplete(file_type, compression_type);
+  HdfsScanNodeBase::RangeComplete(file_type, compression_type, skipped);
 }
 
 void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 18a74ad..782f530 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -93,7 +93,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down
   /// the RowBatch queue.
   virtual void RangeComplete(const THdfsFileFormat::type& file_type,
-      const std::vector<THdfsCompression::type>& compression_type);
+      const std::vector<THdfsCompression::type>& compression_type, bool skipped = false);
 
   /// Transfers all memory from 'pool' to 'scan_node_pool_'.
   virtual void TransferToScanNodePool(MemPool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
index ea459e4..0fe5fb3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
@@ -1,8 +1,34 @@
-====
----- QUERY
 # This query will do a full table scan to count the num of rows
 # read during a scan
 select * from alltypesagg
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 11.00K .
 ====
+---- QUERY
+# This query verifies that a scan range is marked as skipped
+# in the profile if the correct compression cannot be inferred
+# for a scan range
+select count(*) from tpcds_parquet.store_sales
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: PARQUET/Unknown\(Skipped\):.*
+====
+---- QUERY
+# This query verifies that a when a parquet scan range is runtime
+# filtered, it is marked as skipped and the compression codec is
+# marked as unknown.
+set runtime_filter_wait_time_ms=500000;
+select count(*) from tpcds_parquet.store_sales
+join tpcds_parquet.date_dim on
+ss_sold_date_sk = d_date_sk where d_qoy=1
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: PARQUET/NONE:.* PARQUET/Unknown\(Skipped\).*
+====
+---- QUERY
+# This query verifies that a when a text scan range is runtime
+# filtered, it is marked as skipped.
+set runtime_filter_wait_time_ms=100000;
+select count(*) from tpcds.store_sales join tpcds.date_dim on
+ss_sold_date_sk = d_date_sk where d_qoy=1
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: TEXT/NONE:.* TEXT/NONE\(Skipped\):.*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index d355081..f4f2fd6 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -73,10 +73,10 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
     self.run_test_case('QueryTest/scanners', new_vector)
 
   def test_hdfs_scanner_profile(self, vector):
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['num_nodes'] = 1
-    if new_vector.get_value('table_format').file_format in ('kudu', 'hbase'):
+    if vector.get_value('table_format').file_format in ('kudu', 'hbase'):
       pytest.skip()
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['num_nodes'] = 0
     self.run_test_case('QueryTest/hdfs_scanner_profile', new_vector)
 
 # Test all the scanners with a simple limit clause. The limit clause triggers