You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/29 08:09:27 UTC

[impala] branch master updated: IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c3ae41c IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied
d3c3ae41c is described below

commit d3c3ae41c4aeb2dec9f55dacb3dfc357d16713a3
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Nov 23 16:06:28 2022 +0100

    IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied
    
    If an Iceberg V2 table is partitioned, and contains delete files,
    then in a query that involves runtime filters on the partition columns
    return empty result set.
    
    E.g.:
    
      select count(*)
      from store_sales, date_dim
      where d_date_sk = ss_sold_date_sk and d_moy=2 and d_year=1998;
    
    In the above query store_sales is partitioned by ss_sold_date_sk which
    will be filtered by runtime filters created by the JOIN. If store_sales
    has delete files then the above query returns empty result set.
    
    The problem is that we are invoking PartitionPassesFilters() on these
    Iceberg tables. It is usually a no-op for Iceberg tables, as the
    template tuple is NULL. But when we have virtual columns a template
    tuple has been created in HdfsScanPlanNode::InitTemplateTuple. For
    Iceberg tables this tempalte tuple is incomplete, i.e. it doesn't
    have the partition values set. This means the filters evaluate to
    false and the files are getting filtered out, hence the query
    produces an empty result set.
    
    With this patch we don't invoke PartitionPassesFilters() on Iceberg
    tables, only the Iceberg-specific IcebergPartitionPassesFilters()
    gets invoked. Also added DCHECKs to ensure this.
    
    Testing:
     * e2e tests added
    
    Change-Id: I43f3e0a4df7c1ba6d8ea61410b570d8cf7b31ad3
    Reviewed-on: http://gerrit.cloudera.org:8080/19274
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc                 | 18 +++++++------
 be/src/exec/orc/hdfs-orc-scanner.cc                | 12 +++++----
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 12 +++++----
 .../iceberg-v2-read-position-deletes-orc.test      | 28 ++++++++++++++++++++
 .../iceberg-v2-read-position-deletes.test          | 30 ++++++++++++++++++++++
 5 files changed, 82 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index f6f1f008d..a36c68ab5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -739,14 +739,13 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(RuntimeState* state, HdfsFileD
   if (filter_ctxs_.size() == 0) return true;
   ScanRangeMetadata* metadata =
       static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
-  if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
-      filter_ctxs)) {
-    return false;
-  } else if (hdfs_table_->IsIcebergTable() && !IcebergPartitionPassesFilters(
-      metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state)) {
-    return false;
+  if (hdfs_table_->IsIcebergTable()) {
+    return IcebergPartitionPassesFilters(
+        metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state);
+  } else {
+    return PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
+        filter_ctxs);
   }
-  return true;
 }
 
 void HdfsScanNodeBase::SkipScanRange(io::ScanRange* scan_range) {
@@ -781,7 +780,8 @@ Status HdfsScanNodeBase::StartNextScanRange(const std::vector<FilterContext>& fi
     if (filter_ctxs.size() > 0) {
       int64_t partition_id =
           static_cast<ScanRangeMetadata*>((*scan_range)->meta_data())->partition_id;
-      if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
+      if (!hdfs_table()->IsIcebergTable() &&
+          !PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
         SkipScanRange(*scan_range);
         *scan_range = nullptr;
       }
@@ -967,6 +967,7 @@ void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
 
 bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
     const string& stats_name, const vector<FilterContext>& filter_ctxs) {
+  DCHECK(!hdfs_table()->IsIcebergTable());
   if (filter_ctxs.empty()) return true;
   if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return false;
   DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
@@ -992,6 +993,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
 bool HdfsScanNodeBase::IcebergPartitionPassesFilters(int64_t partition_id,
     const string& stats_name, const vector<FilterContext>& filter_ctxs,
     HdfsFileDesc* file, RuntimeState* state) {
+  DCHECK(hdfs_table()->IsIcebergTable());
   file_metadata_utils_.SetFile(state, file);
   // Create the template tuple based on file metadata
   std::map<const SlotId, const SlotDescriptor*> slot_descs_written;
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index 12185af5b..f3050ff97 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -840,11 +840,13 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
   // Apply any runtime filters to static tuples containing the partition keys for this
   // partition. If any filter fails, we return immediately and stop processing this
   // scan range.
-  if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
-      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
-    eos_ = true;
-    DCHECK(parse_status_.ok());
-    return Status::OK();
+  if (!scan_node_->hdfs_table()->IsIcebergTable()) {
+    if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+        FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+      eos_ = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
   }
   assemble_rows_timer_.Start();
   Status status = AssembleRows(row_batch);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index df70ceb2e..13242a7e9 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -470,11 +470,13 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   // Apply any runtime filters to static tuples containing the partition keys for this
   // partition. If any filter fails, we return immediately and stop processing this
   // scan range.
-  if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
-      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
-    eos_ = true;
-    DCHECK(parse_status_.ok());
-    return Status::OK();
+  if (!scan_node_->hdfs_table()->IsIcebergTable()) {
+    if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+        FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+      eos_ = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
   }
   assemble_rows_timer_.Start();
   Status status;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
index 6953273a9..c2931c802 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
@@ -327,3 +327,31 @@ select * from v where ii > 1003;
 ---- TYPES
 BIGINT, STRING
 ====
+---- QUERY
+select *
+from functional_parquet.iceberg_v2_partitioned_position_deletes_orc a,
+     functional_parquet.iceberg_partitioned_orc_external b
+where a.action = b.action and b.id=3;
+---- RESULTS
+12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+---- TYPES
+INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING
+====
+---- QUERY
+select a.input__file__name, a.*
+from iceberg_partitioned_orc_external a,
+     iceberg_partitioned_orc_external b
+where a.id = b.id and a.action = b.action and b.user = 'Lisa'
+order by a.id;
+---- RESULTS
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',2,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',5,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',7,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',8,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',14,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',16,'Lisa','download'
+---- TYPES
+STRING, INT, STRING, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index cde6eafd5..7944be043 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -530,3 +530,33 @@ select * from v where ii > 1003;
 ---- TYPES
 BIGINT, STRING
 ====
+---- QUERY
+SET TIMEZONE='Europe/Budapest';
+select *
+from functional_parquet.iceberg_v2_partitioned_position_deletes a,
+     functional_parquet.iceberg_partitioned b
+where a.action = b.action and b.id=3;
+---- RESULTS
+12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+---- TYPES
+INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING, TIMESTAMP
+====
+---- QUERY
+SET TIMEZONE='Europe/Budapest';
+select a.input__file__name, a.*
+from iceberg_partitioned a,
+     iceberg_partitioned b
+where a.id = b.id and a.action = b.action and b.user = 'Lisa'
+order by a.id;
+---- RESULTS
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',2,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',5,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',7,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',8,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',14,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',16,'Lisa','download',2020-01-01 11:00:00
+---- TYPES
+STRING, INT, STRING, STRING, TIMESTAMP
+====