You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/29 08:09:27 UTC
[impala] branch master updated: IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d3c3ae41c IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied
d3c3ae41c is described below
commit d3c3ae41c4aeb2dec9f55dacb3dfc357d16713a3
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Nov 23 16:06:28 2022 +0100
IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied
If an Iceberg V2 table is partitioned, and contains delete files,
then in a query that involves runtime filters on the partition columns
return empty result set.
E.g.:
select count(*)
from store_sales, date_dim
where d_date_sk = ss_sold_date_sk and d_moy=2 and d_year=1998;
In the above query store_sales is partitioned by ss_sold_date_sk which
will be filtered by runtime filters created by the JOIN. If store_sales
has delete files then the above query returns empty result set.
The problem is that we are invoking PartitionPassesFilters() on these
Iceberg tables. It is usually a no-op for Iceberg tables, as the
template tuple is NULL. But when we have virtual columns a template
tuple has been created in HdfsScanPlanNode::InitTemplateTuple. For
Iceberg tables this tempalte tuple is incomplete, i.e. it doesn't
have the partition values set. This means the filters evaluate to
false and the files are getting filtered out, hence the query
produces an empty result set.
With this patch we don't invoke PartitionPassesFilters() on Iceberg
tables, only the Iceberg-specific IcebergPartitionPassesFilters()
gets invoked. Also added DCHECKs to ensure this.
Testing:
* e2e tests added
Change-Id: I43f3e0a4df7c1ba6d8ea61410b570d8cf7b31ad3
Reviewed-on: http://gerrit.cloudera.org:8080/19274
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/hdfs-scan-node-base.cc | 18 +++++++------
be/src/exec/orc/hdfs-orc-scanner.cc | 12 +++++----
be/src/exec/parquet/hdfs-parquet-scanner.cc | 12 +++++----
.../iceberg-v2-read-position-deletes-orc.test | 28 ++++++++++++++++++++
.../iceberg-v2-read-position-deletes.test | 30 ++++++++++++++++++++++
5 files changed, 82 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index f6f1f008d..a36c68ab5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -739,14 +739,13 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(RuntimeState* state, HdfsFileD
if (filter_ctxs_.size() == 0) return true;
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
- if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
- filter_ctxs)) {
- return false;
- } else if (hdfs_table_->IsIcebergTable() && !IcebergPartitionPassesFilters(
- metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state)) {
- return false;
+ if (hdfs_table_->IsIcebergTable()) {
+ return IcebergPartitionPassesFilters(
+ metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state);
+ } else {
+ return PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
+ filter_ctxs);
}
- return true;
}
void HdfsScanNodeBase::SkipScanRange(io::ScanRange* scan_range) {
@@ -781,7 +780,8 @@ Status HdfsScanNodeBase::StartNextScanRange(const std::vector<FilterContext>& fi
if (filter_ctxs.size() > 0) {
int64_t partition_id =
static_cast<ScanRangeMetadata*>((*scan_range)->meta_data())->partition_id;
- if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
+ if (!hdfs_table()->IsIcebergTable() &&
+ !PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
SkipScanRange(*scan_range);
*scan_range = nullptr;
}
@@ -967,6 +967,7 @@ void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
const string& stats_name, const vector<FilterContext>& filter_ctxs) {
+ DCHECK(!hdfs_table()->IsIcebergTable());
if (filter_ctxs.empty()) return true;
if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return false;
DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
@@ -992,6 +993,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
bool HdfsScanNodeBase::IcebergPartitionPassesFilters(int64_t partition_id,
const string& stats_name, const vector<FilterContext>& filter_ctxs,
HdfsFileDesc* file, RuntimeState* state) {
+ DCHECK(hdfs_table()->IsIcebergTable());
file_metadata_utils_.SetFile(state, file);
// Create the template tuple based on file metadata
std::map<const SlotId, const SlotDescriptor*> slot_descs_written;
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index 12185af5b..f3050ff97 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -840,11 +840,13 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
// Apply any runtime filters to static tuples containing the partition keys for this
// partition. If any filter fails, we return immediately and stop processing this
// scan range.
- if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
- FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
- eos_ = true;
- DCHECK(parse_status_.ok());
- return Status::OK();
+ if (!scan_node_->hdfs_table()->IsIcebergTable()) {
+ if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+ FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+ eos_ = true;
+ DCHECK(parse_status_.ok());
+ return Status::OK();
+ }
}
assemble_rows_timer_.Start();
Status status = AssembleRows(row_batch);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index df70ceb2e..13242a7e9 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -470,11 +470,13 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
// Apply any runtime filters to static tuples containing the partition keys for this
// partition. If any filter fails, we return immediately and stop processing this
// scan range.
- if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
- FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
- eos_ = true;
- DCHECK(parse_status_.ok());
- return Status::OK();
+ if (!scan_node_->hdfs_table()->IsIcebergTable()) {
+ if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+ FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+ eos_ = true;
+ DCHECK(parse_status_.ok());
+ return Status::OK();
+ }
}
assemble_rows_timer_.Start();
Status status;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
index 6953273a9..c2931c802 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
@@ -327,3 +327,31 @@ select * from v where ii > 1003;
---- TYPES
BIGINT, STRING
====
+---- QUERY
+select *
+from functional_parquet.iceberg_v2_partitioned_position_deletes_orc a,
+ functional_parquet.iceberg_partitioned_orc_external b
+where a.action = b.action and b.id=3;
+---- RESULTS
+12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
+---- TYPES
+INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING
+====
+---- QUERY
+select a.input__file__name, a.*
+from iceberg_partitioned_orc_external a,
+ iceberg_partitioned_orc_external b
+where a.id = b.id and a.action = b.action and b.user = 'Lisa'
+order by a.id;
+---- RESULTS
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',2,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',5,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',7,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',8,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',14,'Lisa','download'
+regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',16,'Lisa','download'
+---- TYPES
+STRING, INT, STRING, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index cde6eafd5..7944be043 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -530,3 +530,33 @@ select * from v where ii > 1003;
---- TYPES
BIGINT, STRING
====
+---- QUERY
+SET TIMEZONE='Europe/Budapest';
+select *
+from functional_parquet.iceberg_v2_partitioned_position_deletes a,
+ functional_parquet.iceberg_partitioned b
+where a.action = b.action and b.id=3;
+---- RESULTS
+12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
+---- TYPES
+INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING, TIMESTAMP
+====
+---- QUERY
+SET TIMEZONE='Europe/Budapest';
+select a.input__file__name, a.*
+from iceberg_partitioned a,
+ iceberg_partitioned b
+where a.id = b.id and a.action = b.action and b.user = 'Lisa'
+order by a.id;
+---- RESULTS
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',2,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',5,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',7,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',8,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',14,'Lisa','download',2020-01-01 11:00:00
+regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',16,'Lisa','download',2020-01-01 11:00:00
+---- TYPES
+STRING, INT, STRING, STRING, TIMESTAMP
+====