You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/12/15 14:11:10 UTC

[impala] 02/02: IMPALA-11787, IMPALA-11516: Cardinality estimate for UNION in Iceberg position-delete plans can double the actual table cardinality

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 33929bfccc995c22890ef8783d5f4671ef30bcae
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Dec 13 19:10:47 2022 +0100

    IMPALA-11787, IMPALA-11516: Cardinality estimate for UNION in Iceberg position-delete plans can double the actual table cardinality
    
    The plan for Iceberg tables with position-delete files includes a UNION
    operator that takes the following inputs:
      LHS: Scan of the data files that don't have corresponding delete files
      RHS: ANTI JOIN that filters the data files that do have corresponding
           delete files based on the content of the delete files.
    
    The planner's cardinality estimates for each of these two inputs to the
    UNION can be as large as the full row count of the table (assuming no
    other predicates in the scan) and the planner simply sums these in the
    UNION which can result in a cardinality estimate for the UNION that's
    twice the size of the table.
    
    In this patch IcebergScanNode overrides computeCardinalities() of the
    HdfsScanNode. The method is implemented similarly with a few
    modifications:
    
    * we exactly know the record counts of the data files
    * for table sampling we know the file descriptors, hence the record
      counts as well
    * IDENTITY-based partition conjuncts already filtered out the files, so
      we don't need their selectivity
    
    So we calculate the SCAN NODE's cardinalities much more precisely.
    This patch also sets the column stats for the virtual columns of the
    scan node of the left-hand side of the ANTI JOIN. But because of
    IMPALA-11797 the ANTI JOIN's cardinality always equals to the
    LHS cardinality. IMPALA-11619 can also resolve this.
    
    Testing:
     * planner tests updated
    
    Change-Id: Ie2927c58c4adfd0ba1e135b63454ac9b07991cbf
    Reviewed-on: http://gerrit.cloudera.org:8080/19354
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/fbs/IcebergObjects.fbs                      |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  16 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  63 ++++-
 .../apache/impala/planner/IcebergScanPlanner.java  |  80 +++++-
 .../java/org/apache/impala/util/IcebergUtil.java   |  23 +-
 .../org/apache/impala/planner/PlannerTest.java     |   3 +-
 .../org/apache/impala/planner/PlannerTestBase.java |   5 +
 .../queries/PlannerTest/iceberg-v2-tables.test     | 307 +++++++++++++++++----
 .../queries/PlannerTest/tablesample.test           |  12 +-
 9 files changed, 419 insertions(+), 91 deletions(-)

diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index a5a8c6add..db7fac7d1 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -46,6 +46,7 @@ table FbIcebergPartitionTransformValue {
 
 table FbIcebergMetadata {
   file_format : FbIcebergDataFileFormat;
+  record_count : long;
   partition_keys : [FbIcebergPartitionTransformValue];
 }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cb663e432..adbc54aae 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -328,6 +328,9 @@ public class HdfsScanNode extends ScanNode {
   // this scan node has the count(*) optimization enabled.
   protected SlotDescriptor countStarSlot_ = null;
 
+  // Sampled file descriptors if table sampling is used.
+  Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles_ = null;
+
   // Conjuncts used to trim the set of partitions passed to this node.
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
@@ -1135,7 +1138,6 @@ public class HdfsScanNode extends ScanNode {
    */
   private void computeScanRangeLocations(Analyzer analyzer)
       throws ImpalaRuntimeException {
-    Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles = null;
     if (sampleParams_ != null) {
       long percentBytes = sampleParams_.getPercentBytes();
       long randomSeed;
@@ -1147,15 +1149,15 @@ public class HdfsScanNode extends ScanNode {
       // Pass a minimum sample size of 0 because users cannot set a minimum sample size
       // for scans directly. For compute stats, a minimum sample size can be set, and
       // the sampling percent is adjusted to reflect it.
-      sampledFiles = getFilesSample(percentBytes, 0, randomSeed);
+      sampledFiles_ = getFilesSample(percentBytes, 0, randomSeed);
     }
 
     long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRangeSpecs_ = new TScanRangeSpec();
 
-    if (sampledFiles != null) {
-      numPartitionsPerFs_ = sampledFiles.keySet().stream().collect(Collectors.groupingBy(
+    if (sampledFiles_ != null) {
+      numPartitionsPerFs_ = sampledFiles_.keySet().stream().collect(Collectors.groupingBy(
           SampledPartitionMetadata::getPartitionFsType, Collectors.counting()));
     } else {
       numPartitionsPerFs_.putAll(partitions_.stream().collect(
@@ -1191,9 +1193,9 @@ public class HdfsScanNode extends ScanNode {
       // conservatively estimate 1 row per file
       simpleLimitNumRows += fileDescs.size();
 
-      if (sampledFiles != null) {
+      if (sampledFiles_ != null) {
         // If we are sampling, check whether this partition is included in the sample.
-        fileDescs = sampledFiles.get(
+        fileDescs = sampledFiles_.get(
             new SampledPartitionMetadata(partition.getId(), partition.getFsType()));
         if (fileDescs == null) continue;
       }
@@ -1482,7 +1484,7 @@ public class HdfsScanNode extends ScanNode {
    * Sets these members:
    * extrapolatedNumRows_, inputCardinality_, cardinality_
    */
-  private void computeCardinalities(Analyzer analyzer) {
+  protected void computeCardinalities(Analyzer analyzer) {
     // Choose between the extrapolated row count and the one based on stored stats.
     extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_,
             sumValues(totalBytesPerFs_));
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index d85ada46c..9325b3b29 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.TableRef;
@@ -34,9 +35,12 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -45,11 +49,18 @@ import com.google.common.collect.Lists;
  * Scan of a single iceberg table.
  */
 public class IcebergScanNode extends HdfsScanNode {
+  private final static Logger LOG = LoggerFactory.getLogger(IcebergScanNode.class);
 
   private List<FileDescriptor> fileDescs_;
+  // Conjuncts on columns not involved in IDENTITY-partitioning. Subset of 'conjuncts_',
+  // but this does not include conjuncts on IDENTITY-partitioned columns, because such
+  // conjuncts have already been pushed to Iceberg to filter out partitions/files, so
+  // they don't have further selectivity on the surviving files.
+  private List<Expr> nonIdentityConjuncts_;
 
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
-      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs)
+      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs,
+      List<Expr> nonIdentityConjuncts)
       throws ImpalaRuntimeException {
     super(id, tblRef.getDesc(), conjuncts,
         getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef,
@@ -58,6 +69,7 @@ public class IcebergScanNode extends HdfsScanNode {
     Preconditions.checkState(partitions_.size() == 1);
 
     fileDescs_ = fileDescs;
+    nonIdentityConjuncts_ = nonIdentityConjuncts;
     //TODO IMPALA-11577: optimize file format counting
     boolean hasParquet = false;
     boolean hasOrc = false;
@@ -90,10 +102,53 @@ public class IcebergScanNode extends HdfsScanNode {
   }
 
   /**
-   * In some cases we exactly know the cardinality, e.g. POSITION DELETE scan node.
+   * Computes cardinalities of the Iceberg scan node. Implemented based on
+   * HdfsScanNode.computeCardinalities with some modifications:
+   *   - we exactly know the record counts of the data files
+   *   - IDENTITY-based partition conjuncts already filtered out the files, so
+   *     we don't need their selectivity
    */
-  public void setCardinality(long cardinality) {
-    cardinality_ = cardinality;
+  @Override
+  protected void computeCardinalities(Analyzer analyzer) {
+    cardinality_ = 0;
+
+    if (sampledFiles_ != null) {
+      for (List<FileDescriptor> sampledFileDescs : sampledFiles_.values()) {
+        for (FileDescriptor fd : sampledFileDescs) {
+          cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
+        }
+      }
+    } else {
+      for (FileDescriptor fd : fileDescs_) {
+        cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
+      }
+    }
+
+    // Adjust cardinality for all collections referenced along the tuple's path.
+    for (Type t: desc_.getPath().getMatchedTypes()) {
+      if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE;
+    }
+    inputCardinality_ = cardinality_;
+
+    if (cardinality_ > 0) {
+      double selectivity = computeCombinedSelectivity(nonIdentityConjuncts_);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("cardinality_=" + Long.toString(cardinality_) +
+                  " sel=" + Double.toString(selectivity));
+      }
+      cardinality_ = applySelectivity(cardinality_, selectivity);
+    }
+
+    cardinality_ = capCardinalityAtLimit(cardinality_);
+
+    if (countStarSlot_ != null) {
+      // We are doing optimized count star. Override cardinality with total num files.
+      inputCardinality_ = fileDescs_.size();
+      cardinality_ = fileDescs_.size();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("IcebergScanNode: cardinality_=" + Long.toString(cardinality_));
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index fbbce6455..730ce3af9 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
@@ -51,6 +52,7 @@ import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.DateLiteral;
 import org.apache.impala.analysis.MultiAggregateInfo;
@@ -80,6 +82,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TColumnStats;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.ExprUtil;
 import org.apache.impala.util.IcebergUtil;
@@ -111,6 +114,9 @@ public class IcebergScanPlanner {
   private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>();
   private Set<FileDescriptor> deleteFiles_ = new HashSet<>();
 
+  // Conjuncts on columns not involved in IDENTITY-partitioning.
+  private List<Expr> nonIdentityConjuncts_ = new ArrayList<>();
+
   // Statistics about the data and delete files. Useful for memory estimates of the
   // ANTI JOIN
   private long deletesRecordCount_ = 0;
@@ -143,7 +149,7 @@ public class IcebergScanPlanner {
       // If there are no delete files we can just create a single SCAN node.
       Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
       ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
-          aggInfo_, dataFilesWithoutDeletes_);
+          aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_);
       ret.init(analyzer_);
     } else {
       // Let's create a bit more complex plan in the presence of delete files.
@@ -172,7 +178,8 @@ public class IcebergScanPlanner {
 
   private PlanNode planWithoutIceberg() throws ImpalaException {
     PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
-        aggInfo_, getIceTable().getContentFileStore().getDataFiles());
+        aggInfo_, getIceTable().getContentFileStore().getDataFiles(),
+        nonIdentityConjuncts_);
     ret.init(analyzer_);
     return ret;
   }
@@ -187,7 +194,8 @@ public class IcebergScanPlanner {
     // If there are data files without corresponding delete files to be applied, we
     // can just create a SCAN node for these and do a UNION ALL with the ANTI JOIN.
     IcebergScanNode dataScanNode = new IcebergScanNode(
-      ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_);
+        ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_,
+        nonIdentityConjuncts_);
     dataScanNode.init(analyzer_);
     List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
         entry -> new SlotRef(entry)).collect(Collectors.toList());
@@ -217,13 +225,14 @@ public class IcebergScanPlanner {
     addDataVirtualPositionSlots(tblRef_);
     addDeletePositionSlots(deleteDeltaRef);
     IcebergScanNode dataScanNode = new IcebergScanNode(
-      dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_);
+        dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
+        nonIdentityConjuncts_);
     dataScanNode.init(analyzer_);
     IcebergScanNode deleteScanNode = new IcebergScanNode(
         deleteScanNodeId, deleteDeltaRef, /*conjuncts=*/Collections.emptyList(),
-        aggInfo_, Lists.newArrayList(deleteFiles_));
+        aggInfo_, Lists.newArrayList(deleteFiles_),
+        /*nonIdentityConjuncts=*/Collections.emptyList());
     deleteScanNode.init(analyzer_);
-    deleteScanNode.setCardinality(deletesRecordCount_);
 
     // Now let's create the JOIN node
     List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
@@ -249,6 +258,14 @@ public class IcebergScanPlanner {
       SingleNodePlanner.addSlotRefToDesc(analyzer_, rawPath);
       rawPath.remove(rawPath.size() - 1);
     }
+    for (SlotDescriptor insertSlotDesc : tblRef.getDesc().getSlots()) {
+      TVirtualColumnType virtColType = insertSlotDesc.getVirtualColumnType();
+      if (virtColType == TVirtualColumnType.INPUT_FILE_NAME) {
+        insertSlotDesc.setStats(virtualInputFileNameStats());
+      } else if (virtColType == TVirtualColumnType.FILE_POSITION) {
+        insertSlotDesc.setStats(virtualFilePositionStats());
+      }
+    }
   }
 
   private void addDeletePositionSlots(TableRef tblRef)
@@ -297,6 +314,18 @@ public class IcebergScanPlanner {
     return ret;
   }
 
+  private ColumnStats virtualInputFileNameStats() {
+    ColumnStats ret = new ColumnStats(Type.STRING);
+    ret.setNumDistinctValues(dataFilesWithDeletes_.size());
+    return ret;
+  }
+
+  private ColumnStats virtualFilePositionStats() {
+    ColumnStats ret = new ColumnStats(Type.BIGINT);
+    ret.setNumDistinctValues(deletesRecordCount_ / dataFilesWithDeletes_.size());
+    return ret;
+  }
+
   private void filterFileDescriptors() throws ImpalaException {
     TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
 
@@ -434,25 +463,49 @@ public class IcebergScanPlanner {
   private void extractIcebergConjuncts() throws ImpalaException {
     boolean isPartitionColumnIncluded = false;
     Map<SlotId, SlotDescriptor> idToSlotDesc = new HashMap<>();
+    Set<Expr> identityConjuncts = new HashSet<>();
     for (SlotDescriptor slotDesc : tblRef_.getDesc().getSlots()) {
       idToSlotDesc.put(slotDesc.getId(), slotDesc);
     }
     for (Expr expr : conjuncts_) {
       if (isPartitionColumnIncluded(expr, idToSlotDesc)) {
         isPartitionColumnIncluded = true;
-        break;
+        if (isIdentityPartitionIncluded(expr, idToSlotDesc)) {
+          identityConjuncts.add(expr);
+        }
       }
     }
     if (!isPartitionColumnIncluded) {
+      // No partition conjuncts, i.e. every conjunct is non-identity conjunct.
+      nonIdentityConjuncts_ = conjuncts_;
       return;
     }
     for (Expr expr : conjuncts_) {
-      tryConvertIcebergPredicate(expr);
+      if (tryConvertIcebergPredicate(expr)) {
+        if (!identityConjuncts.contains(expr)) {
+          nonIdentityConjuncts_.add(expr);
+        }
+      } else {
+        nonIdentityConjuncts_.add(expr);
+      }
     }
   }
 
   private boolean isPartitionColumnIncluded(Expr expr,
       Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    return hasPartitionTransformType(expr, idToSlotDesc,
+        transformType -> transformType != TIcebergPartitionTransformType.VOID);
+  }
+
+  private boolean isIdentityPartitionIncluded(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    return hasPartitionTransformType(expr, idToSlotDesc,
+        transformType -> transformType == TIcebergPartitionTransformType.IDENTITY);
+  }
+
+  private boolean hasPartitionTransformType(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc,
+      Predicate<TIcebergPartitionTransformType> pred) {
     List<TupleId> tupleIds = Lists.newArrayList();
     List<SlotId> slotIds = Lists.newArrayList();
     expr.getIds(tupleIds, slotIds);
@@ -467,8 +520,11 @@ public class IcebergScanPlanner {
       if (col == null) continue;
       Preconditions.checkState(col instanceof IcebergColumn);
       IcebergColumn iceCol = (IcebergColumn)col;
-      if (IcebergUtil.isPartitionColumn(iceCol,
-          getIceTable().getDefaultPartitionSpec())) {
+      TIcebergPartitionTransformType transformType =
+          IcebergUtil.getPartitionTransformType(
+              iceCol,
+              getIceTable().getDefaultPartitionSpec());
+      if (pred.test(transformType)) {
         return true;
       }
     }
@@ -508,13 +564,15 @@ public class IcebergScanPlanner {
   /**
    * Transform impala predicate to iceberg predicate
    */
-  private void tryConvertIcebergPredicate(Expr expr)
+  private boolean tryConvertIcebergPredicate(Expr expr)
       throws ImpalaException {
     Expression predicate = convertIcebergPredicate(expr);
     if (predicate != null) {
       icebergPredicates_.add(predicate);
       LOG.debug("Push down the predicate: " + predicate + " to iceberg");
+      return true;
     }
+    return false;
   }
 
   private Expression convertIcebergPredicate(Expr expr)
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 3f9dc7314..6d08e4866 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -922,6 +922,7 @@ public class IcebergUtil {
     if (fileFormat != -1) {
       FbIcebergMetadata.addFileFormat(fbb, fileFormat);
     }
+    FbIcebergMetadata.addRecordCount(fbb, cf.recordCount());
     if (partKeysOffset != -1) {
       FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset);
     }
@@ -1015,14 +1016,26 @@ public class IcebergUtil {
     });
   }
 
-  public static boolean isPartitionColumn(IcebergColumn column,
+  /**
+   * Returns the partition transform type used for this column in the given spec.
+   * Returns TIcebergPartitionTransformType.VOID if the column is not used as a
+   * partitioning column.
+   */
+  public static TIcebergPartitionTransformType getPartitionTransformType(
+      IcebergColumn column,
       IcebergPartitionSpec spec) {
-    if (!spec.hasPartitionFields()) return false;
+    if (!spec.hasPartitionFields()) return TIcebergPartitionTransformType.VOID;
     for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
       if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
-      if (column.getFieldId() != partField.getSourceId()) continue;
-      return true;
+      if (column.getFieldId() == partField.getSourceId()) {
+        return partField.getTransformType();
+      }
     }
-    return false;
+    return TIcebergPartitionTransformType.VOID;
+  }
+
+  public static boolean isPartitionColumn(IcebergColumn column,
+      IcebergPartitionSpec spec) {
+    return getPartitionTransformType(column, spec) != TIcebergPartitionTransformType.VOID;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5f2383e48..d040a32d3 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1281,8 +1281,7 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testIcebergV2TableScans() {
     runPlannerTestFile("iceberg-v2-tables", "functional_parquet",
-        ImmutableSet.of(
-            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index c9bba8fdf..71c35a9c0 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -246,6 +246,11 @@ public class PlannerTestBase extends FrontendTestBase {
         // All partitions of insertTableId are okay.
         if (tableDesc.getId() == insertTableId) continue;
         if (!tableDesc.isSetHdfsTable()) continue;
+        // Iceberg partitions are handled differently, in Impala there's always a single
+        // HMS partition in an Iceberg table and actual partition/file pruning is
+        // handled by Iceberg. This means 'scanRangePartitions' can be empty while the
+        // descriptor table still has the single HMS partition.
+        if (tableDesc.isSetIcebergTable() && scanRangePartitions.isEmpty()) continue;
         THdfsTable hdfsTable = tableDesc.getHdfsTable();
         for (Map.Entry<Long, THdfsPartition> e :
              hdfsTable.getPartitions().entrySet()) {
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index d9db7f81f..e02ca0d7f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -51,7 +51,7 @@ PLAN-ROOT SINK
 |  row-size=8B cardinality=1
 |
 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  row-size=20B cardinality=4.73K
+|  row-size=20B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
 |     HDFS partitions=1/1 files=1 size=1.54KB
@@ -214,7 +214,7 @@ PLAN-ROOT SINK
 |  row-size=36B cardinality=1
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -222,11 +222,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=1
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -239,7 +239,7 @@ PLAN-ROOT SINK
 |  row-size=36B cardinality=1
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -251,11 +251,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 SELECT * from iceberg_v2_positional_not_all_data_files_have_delete_files
 ---- PLAN
@@ -263,10 +263,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -274,11 +274,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -286,10 +286,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -301,11 +301,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 SELECT * from iceberg_v2_positional_update_all_rows
 ---- PLAN
@@ -313,10 +313,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=12
+|  row-size=36B cardinality=6
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=6
+|  |  row-size=36B cardinality=3
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
 |  |     HDFS partitions=1/1 files=1 size=2.60KB
@@ -324,11 +324,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=36B cardinality=6
+|     row-size=36B cardinality=3
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
    HDFS partitions=1/1 files=1 size=625B
-   row-size=36B cardinality=6
+   row-size=36B cardinality=3
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -336,10 +336,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=12
+|  row-size=36B cardinality=6
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
-|  |  row-size=36B cardinality=6
+|  |  row-size=36B cardinality=3
 |  |
 |  |--05:EXCHANGE [BROADCAST]
 |  |  |
@@ -349,11 +349,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=36B cardinality=6
+|     row-size=36B cardinality=3
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
    HDFS partitions=1/1 files=1 size=625B
-   row-size=36B cardinality=6
+   row-size=36B cardinality=3
 ====
 SELECT * from iceberg_v2_partitioned_position_deletes
 ---- PLAN
@@ -452,22 +452,22 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=16B cardinality=10
+|     row-size=16B cardinality=3
 |
 05:AGGREGATE [FINALIZE]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -475,11 +475,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -487,30 +487,30 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
 |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 |--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)]
 |  |
 |  06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=16B cardinality=10
+|     row-size=16B cardinality=3
 |
 11:AGGREGATE [FINALIZE]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 10:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -522,11 +522,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 with v as (select i + 1000 as ii, upper(s) as ss from iceberg_v2_positional_not_all_data_files_have_delete_files)
 select * from v where ii > 1003;
@@ -592,7 +592,7 @@ PLAN-ROOT SINK
 11:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: i = max(i)
 |  runtime filters: RF000 <- max(i)
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--10:AGGREGATE [FINALIZE]
 |  |  output: max(i)
@@ -600,10 +600,10 @@ PLAN-ROOT SINK
 |  |
 |  09:UNION
 |  |  pass-through-operands: all
-|  |  row-size=24B cardinality=12
+|  |  row-size=24B cardinality=6
 |  |
 |  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  |  row-size=24B cardinality=6
+|  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
 |  |  |     HDFS partitions=1/1 files=1 size=2.60KB
@@ -611,18 +611,18 @@ PLAN-ROOT SINK
 |  |  |
 |  |  05:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |  |     HDFS partitions=1/1 files=1 size=625B
-|  |     row-size=24B cardinality=6
+|  |     row-size=24B cardinality=3
 |  |
 |  08:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=24B cardinality=6
+|     row-size=24B cardinality=3
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -631,12 +631,12 @@ PLAN-ROOT SINK
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
 |     runtime filters: RF000 -> i
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    runtime filters: RF000 -> i
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -645,7 +645,7 @@ PLAN-ROOT SINK
 11:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: i = max(i)
 |  runtime filters: RF000 <- max(i)
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--17:EXCHANGE [BROADCAST]
 |  |
@@ -661,10 +661,10 @@ PLAN-ROOT SINK
 |  |
 |  09:UNION
 |  |  pass-through-operands: all
-|  |  row-size=24B cardinality=12
+|  |  row-size=24B cardinality=6
 |  |
 |  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
-|  |  |  row-size=24B cardinality=6
+|  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--14:EXCHANGE [BROADCAST]
 |  |  |  |
@@ -674,18 +674,18 @@ PLAN-ROOT SINK
 |  |  |
 |  |  05:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |  |     HDFS partitions=1/1 files=1 size=625B
-|  |     row-size=24B cardinality=6
+|  |     row-size=24B cardinality=3
 |  |
 |  08:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=24B cardinality=6
+|     row-size=24B cardinality=3
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--13:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -698,10 +698,205 @@ PLAN-ROOT SINK
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
 |     runtime filters: RF000 -> i
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    runtime filters: RF000 -> i
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
+====
+select * from iceberg_v2_partitioned_position_deletes where action = 'download';
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=64B cardinality=6
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: action = 'download'
+   row-size=64B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=64B cardinality=6
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: action = 'download'
+   row-size=64B cardinality=6
+====
+select * from iceberg_v2_partitioned_position_deletes
+where action = 'download' and user = 'Lisa';
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=64B cardinality=1
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: `user` = 'Lisa', action = 'download'
+   row-size=64B cardinality=1
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=64B cardinality=1
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: `user` = 'Lisa', action = 'download'
+   row-size=64B cardinality=1
+====
+select event_time, action from iceberg_partitioned where action = 'click' or action = 'view';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where action in ('click', 'view');
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
+   row-size=28B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
+   row-size=28B cardinality=6
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view'
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view'
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action in ('click', 'view');
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action > 'a';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
+   row-size=28B cardinality=20
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
+   row-size=28B cardinality=20
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=0 size=0B
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   row-size=28B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=0 size=0B
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   row-size=28B cardinality=0
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 56b9ae535..201f72371 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -263,7 +263,7 @@ PLAN-ROOT SINK
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=6
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=44B cardinality=2
+   tuple-ids=0 row-size=44B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sampling Iceberg tables. Count(*) is not optimized.
@@ -288,7 +288,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6
    mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1
-   tuple-ids=0 row-size=0B cardinality=2
+   tuple-ids=0 row-size=0B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sampling partitioned Iceberg tables.
@@ -331,7 +331,7 @@ PLAN-ROOT SINK
    parquet statistics predicates: action = 'click'
    parquet dictionary predicates: action = 'click'
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=44B cardinality=1
+   tuple-ids=0 row-size=44B cardinality=4
    in pipelines: 00(GETNEXT)
 ====
 # Sampling Iceberg V2 tables. Delete files are not sampled, only the data files. So we
@@ -348,13 +348,13 @@ PLAN-ROOT SINK
 04:UNION
 |  pass-through-operands: all
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
-|  tuple-ids=0 row-size=36B cardinality=4.85K
+|  tuple-ids=0 row-size=36B cardinality=4
 |  in pipelines: 03(GETNEXT), 00(GETNEXT)
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
 |  |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path
 |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=0 row-size=36B cardinality=2.42K
+|  |  tuple-ids=0 row-size=36B cardinality=3
 |  |  in pipelines: 00(GETNEXT), 01(OPEN)
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -374,7 +374,7 @@ PLAN-ROOT SINK
 |       columns missing stats: i, s
 |     extrapolated-rows=disabled max-scan-range-rows=10
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-|     tuple-ids=0 row-size=36B cardinality=1
+|     tuple-ids=0 row-size=36B cardinality=3
 |     in pipelines: 00(GETNEXT)
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]