You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/02/22 07:39:03 UTC

[impala] 03/04: IMPALA-11802: Optimize count(*) queries for Iceberg V2 position delete tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3153490545d1b3730ba17bc020909f2ae9c18d94
Author: LPL <li...@apache.org>
AuthorDate: Fri Feb 10 11:57:40 2023 +0800

    IMPALA-11802: Optimize count(*) queries for Iceberg V2 position delete tables
    
    The SCAN plan of count star query for Iceberg V2 position delete tables
    as follows:
    
        AGGREGATE
        COUNT(*)
            |
        UNION ALL
       /         \
      /           \
     /             \
    SCAN all    ANTI JOIN
    datafiles  /         \
    without   /           \
    deletes  SCAN         SCAN
             datafiles    deletes
    
    Since Iceberg provides the number of records in a file(record_count), we
    can use this to optimize a simple count star query for Iceberg V2
    position delete tables. Firstly, the number of records of all DataFiles
    without corresponding DeleteFiles can be calculated by Iceberg meta
    files. And then rewrite the query as follows:
    
          ArithmeticExpr(ADD)
          /             \
         /               \
        /                 \
    record_count       AGGREGATE
    of all             COUNT(*)
    datafiles              |
    without            ANTI JOIN
    deletes           /         \
                     /           \
                    SCAN        SCAN
                    datafiles   deletes
    
    Testing:
     * Existing tests
     * Added e2e tests
    
    Change-Id: I8172c805121bf91d23fe063f806493afe2f03d41
    Reviewed-on: http://gerrit.cloudera.org:8080/19494
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 common/thrift/Query.thrift                         |   3 +
 .../java/org/apache/impala/analysis/Analyzer.java  |  26 ++++--
 .../main/java/org/apache/impala/analysis/Expr.java |   5 +
 .../org/apache/impala/analysis/SelectStmt.java     |  53 +++++++----
 .../org/apache/impala/catalog/FeIcebergTable.java  |  49 +++++++---
 .../apache/impala/planner/IcebergScanPlanner.java  |  15 ++-
 .../impala/rewrite/CountStarToConstRule.java       |  65 +++++++++++--
 .../queries/PlannerTest/iceberg-v2-tables.test     | 104 +++++++++++++++++++++
 .../iceberg-plain-count-star-optimization.test     |   2 +-
 .../iceberg-v2-plain-count-star-optimization.test  |  47 ++++++++++
 .../iceberg-v2-read-position-deletes-orc.test      |  63 ++++++++++++-
 .../iceberg-v2-read-position-deletes.test          |  98 ++++++++++++++++++-
 tests/query_test/test_iceberg.py                   |   6 ++
 13 files changed, 485 insertions(+), 51 deletions(-)

diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 2c433b809..49d34b40d 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -792,6 +792,9 @@ struct TQueryCtx {
 
   // True if the query is transactional for Kudu table.
   29: required bool is_kudu_transactional = false
+
+  // True if the query can be optimized for Iceberg V2 table.
+  30: required bool optimize_count_star_for_iceberg_v2 = false
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index c2e9c174a..2acdea7a9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -231,8 +231,12 @@ public class Analyzer {
   // if an exception was encountered.
   private String mvAuthExceptionMsg_ = null;
 
-  // Total records num of the Iceberg table.
-  private long totalRecordsNum_;
+  // Total records num V1 is calculated by all DataFiles of the Iceberg V1 table.
+  private long totalRecordsNumV1_;
+
+  // Total records num V2 is calculated by all DataFiles without corresponding DeleteFiles
+  // to be applied of the Iceberg V2 table.
+  private long totalRecordsNumV2_;
 
   // Required Operation type: Read, write, any(read or write).
   public enum OperationType {
@@ -992,18 +996,28 @@ public class Analyzer {
     return mvAuthExceptionMsg_;
   }
 
-  public void setTotalRecordsNum(long totalRecordsNum) {
-    totalRecordsNum_ = totalRecordsNum;
+  public void setTotalRecordsNumV1(long totalRecordsNumV1) {
+    totalRecordsNumV1_ = totalRecordsNumV1;
+  }
+
+  public long getTotalRecordsNumV1() { return totalRecordsNumV1_; }
+
+  public void setTotalRecordsNumV2(long totalRecordsNumV2) {
+    totalRecordsNumV2_ = totalRecordsNumV2;
   }
 
-  public long getTotalRecordsNum() { return totalRecordsNum_; }
+  public long getTotalRecordsNumV2() {
+    return totalRecordsNumV2_;
+  }
 
   /**
    * Check if 'count(*)' FunctionCallExpr can be rewritten as LiteralExpr. When
    * totalRecordsNum_ is 0, no optimization 'count(*)' is still very fast, so return true
    * only if totalRecordsNum_ is greater than 0.
    */
-  public boolean canRewriteCountStarToConst() { return totalRecordsNum_ > 0; }
+  public boolean canRewriteCountStarForV1() { return totalRecordsNumV1_ > 0; }
+
+  public boolean canRewriteCountStartForV2() { return totalRecordsNumV2_ > 0; }
 
   /**
    * Register conjuncts that are outer joined by a full outer join. For a given
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 44c478efb..5b2ed1463 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -418,6 +418,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   // True after analysis successfully completed. Protected by accessors isAnalyzed() and
   // analysisDone().
   private boolean isAnalyzed_ = false;
+  private boolean isRewritten_ = false;
+
 
   // True if this has already been counted towards the number of statement expressions
   private boolean isCountedForNumStmtExprs_ = false;
@@ -443,6 +445,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     isAuxExpr_ = other.isAuxExpr_;
     type_ = other.type_;
     isAnalyzed_ = other.isAnalyzed_;
+    isRewritten_ = other.isRewritten_;
     isOnClauseConjunct_ = other.isOnClauseConjunct_;
     printSqlInParens_ = other.printSqlInParens_;
     selectivity_ = other.selectivity_;
@@ -460,6 +463,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   public boolean isAnalyzed() { return isAnalyzed_; }
+  public boolean isRewritten() { return isRewritten_; }
+  public void setRewritten(boolean isRewritten) { isRewritten_ = isRewritten; }
   public ExprId getId() { return id_; }
   protected void setId(ExprId id) { id_ = id; }
   public Type getType() { return type_; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index e420462be..b3de08ac7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.stream.Stream;
 import java.util.stream.Collectors;
 
+import org.apache.iceberg.Table;
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.ArrayType;
@@ -1424,7 +1425,7 @@ public class SelectStmt extends QueryStmt {
 
 
   /**
-   * Set totalRecordsNum_ in analyzer_ for the plain count(*) queries of Iceberg tables.
+   * Set totalRecordsNumVx_ in analyzer_ for the plain count(*) queries of Iceberg tables.
    * Queries that can be rewritten need to meet the following requirements:
    *  - stmt does not have WHERE clause
    *  - stmt does not have GROUP BY clause
@@ -1433,9 +1434,8 @@ public class SelectStmt extends QueryStmt {
    *  - tableRef doesn't have sampling param
    *  - table is the Iceberg table
    *  - SelectList must contains 'count(*)' or 'count(constant)'
-   *  - SelectList can contain other agg functions, e.g. min, sum, etc
    *  - SelectList can contain constant
-   *
+   *  - only for V1: SelectList can contain other agg functions, e.g. min, sum, etc
    * e.g. 'SELECT count(*) FROM iceberg_tbl' would be rewritten as 'SELECT constant'.
    */
   public void optimizePlainCountStarQuery() throws AnalysisException {
@@ -1459,28 +1459,45 @@ public class SelectStmt extends QueryStmt {
     }
     if (!(table instanceof FeIcebergTable)) return;
 
+    analyzer_.checkStmtExprLimit();
+    Table iceTable = ((FeIcebergTable) table).getIcebergApiTable();
+    if (Utils.hasDeleteFiles(iceTable, tableRef.getTimeTravelSpec())) {
+      optimizePlainCountStarQueryV2(tableRef, (FeIcebergTable)table);
+    } else {
+      optimizePlainCountStarQueryV1(tableRef, iceTable);
+    }
+  }
+
+  private void optimizePlainCountStarQueryV2(TableRef tableRef, FeIcebergTable table)
+      throws AnalysisException {
+    for (SelectListItem selectItem : getSelectList().getItems()) {
+      Expr expr = selectItem.getExpr();
+      if (expr == null) return;
+      if (expr.isConstant()) continue;
+      if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return;
+    }
+    long num = Utils.getRecordCountV2(table, tableRef.getTimeTravelSpec());
+    if (num > 0) {
+      analyzer_.getQueryCtx().setOptimize_count_star_for_iceberg_v2(true);
+      analyzer_.setTotalRecordsNumV2(num);
+    }
+  }
+
+  private void optimizePlainCountStarQueryV1(TableRef tableRef, Table iceTable) {
     boolean hasCountStarFunc = false;
     boolean hasAggFunc = false;
-    analyzer_.checkStmtExprLimit();
-    for (SelectListItem selectItem : this.getSelectList().getItems()) {
+    for (SelectListItem selectItem : getSelectList().getItems()) {
       Expr expr = selectItem.getExpr();
-      if (expr == null) continue;
+      if (expr == null) return;
       if (expr.isConstant()) continue;
-      if (FunctionCallExpr.isCountStarFunctionCallExpr(expr)) {
-        hasCountStarFunc = true;
-      } else if (expr.isAggregate()) {
-        hasAggFunc = true;
-      } else {
-        return;
-      }
+      if (FunctionCallExpr.isCountStarFunctionCallExpr(expr)) { hasCountStarFunc = true; }
+      else if (expr.isAggregate()) { hasAggFunc = true; }
+      else return;
     }
     if (!hasCountStarFunc) return;
-
-    long num = Utils.getRecordCount(
-        ((FeIcebergTable) table).getIcebergApiTable(), tableRef.getTimeTravelSpec());
+    long num = Utils.getRecordCountV1(iceTable, tableRef.getTimeTravelSpec());
     if (num <= 0) return;
-    analyzer_.setTotalRecordsNum(num);
-
+    analyzer_.setTotalRecordsNumV1(num);
     if (hasAggFunc) return;
     // When all select items are 'count(*)' or constant, 'select count(*) from ice_tbl;'
     // would need to be rewritten as 'select const;'
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index b3a66a0d8..fbda3fd06 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -63,6 +63,7 @@ import org.apache.impala.analysis.TimeTravelSpec.Kind;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
@@ -862,22 +863,13 @@ public interface FeIcebergTable extends FeFsTable {
      * the record count cannot be retrieved from the table summary.
      * If 'travelSpec' is null then the current snapshot is being used.
      */
-    public static long getRecordCount(Table icebergTable,
-        TimeTravelSpec travelSpec) {
+    public static long getRecordCountV1(Table icebergTable, TimeTravelSpec travelSpec) {
       Map<String, String> summary = getSnapshotSummary(icebergTable, travelSpec);
       if (summary == null) return -1;
 
       String totalRecordsStr = summary.get(SnapshotSummary.TOTAL_RECORDS_PROP);
       if (Strings.isNullOrEmpty(totalRecordsStr)) return -1;
-
       try {
-        // We cannot tell the record count from the summary if there are deleted rows.
-        String totalDeleteFilesStr = summary.get(SnapshotSummary.TOTAL_DELETE_FILES_PROP);
-        if (!Strings.isNullOrEmpty(totalDeleteFilesStr)) {
-          long totalDeleteFiles = Long.parseLong(totalDeleteFilesStr);
-          if (totalDeleteFiles > 0) return -1;
-        }
-
         return Long.parseLong(totalRecordsStr);
       } catch (NumberFormatException ex) {
         LOG.warn("Failed to get {} from iceberg table summary. Table name: {}, " +
@@ -885,10 +877,45 @@ public interface FeIcebergTable extends FeFsTable {
             SnapshotSummary.TOTAL_RECORDS_PROP, icebergTable.name(),
             icebergTable.location(), totalRecordsStr, ex);
       }
-
       return -1;
     }
 
+    /**
+     * Return the record count that is calculated by all DataFiles without deletes.
+     */
+    public static long getRecordCountV2(FeIcebergTable table, TimeTravelSpec travelSpec)
+        throws AnalysisException {
+      if (travelSpec == null) {
+        return table.getContentFileStore()
+            .getDataFilesWithoutDeletes().stream()
+            .mapToLong(file -> file.getFbFileMetadata().icebergMetadata().recordCount())
+            .sum();
+      }
+      try {
+        return IcebergUtil.getIcebergFiles(table, Lists.newArrayList(), travelSpec)
+            .dataFilesWithoutDeletes.stream()
+            .mapToLong(ContentFile::recordCount)
+            .sum();
+      } catch (TableLoadingException e) {
+        throw new AnalysisException("Failed to get record count of Iceberg V2 table: "
+            + table.getFullName() ,e);
+      }
+    }
+
+    /**
+     * Return true if the Iceberg has DeleteFiles.
+     */
+    public static boolean hasDeleteFiles(Table icebergTable, TimeTravelSpec travelSpec) {
+      Map<String, String> summary = getSnapshotSummary(icebergTable, travelSpec);
+      if (summary == null) return false;
+      String totalDeleteFilesStr = summary.get(SnapshotSummary.TOTAL_DELETE_FILES_PROP);
+      if (!Strings.isNullOrEmpty(totalDeleteFilesStr)) {
+        long totalDeleteFiles = Long.parseLong(totalDeleteFilesStr);
+        return totalDeleteFiles > 0;
+      }
+      return false;
+    }
+
     /**
      * Get the snapshot summary from the Iceberg table.
      */
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 2df5bc329..dea0ebcc2 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -176,11 +176,16 @@ public class IcebergScanPlanner {
       return ret;
     }
     PlanNode joinNode = createPositionJoinNode();
-    if (dataFilesWithoutDeletes_.isEmpty()) {
-      // All data files has corresponding delete files, so we just return an ANTI JOIN
-      // between all data files and all delete files.
-      return joinNode;
-    }
+
+    // If the count star query can be optimized for Iceberg V2 table, the number of rows
+    // of all DataFiles without corresponding DeleteFiles can be calculated by Iceberg
+    // meta files, it's added using ArithmeticExpr.
+    if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return joinNode;
+
+    // All data files has corresponding delete files, so we just return an ANTI JOIN
+    // between all data files and all delete files.
+    if (dataFilesWithoutDeletes_.isEmpty()) return joinNode;
+
     // If there are data files without corresponding delete files to be applied, we
     // can just create a SCAN node for these and do a UNION ALL with the ANTI JOIN.
     IcebergScanNode dataScanNode = new IcebergScanNode(
diff --git a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
index 6709ec28d..dc1fa0ef9 100644
--- a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
+++ b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
@@ -18,25 +18,74 @@
 package org.apache.impala.rewrite;
 
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.ArithmeticExpr;
+import org.apache.impala.analysis.ArithmeticExpr.Operator;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 
 /**
  * Rewrite rule to replace plain count star function call expr to const expr.
+ * Examples:
+ * 1. Iceberg V1 Table
+ * 1.1 "SELECT COUNT(*) FROM ice_tbl" -> "SELECT `CONST`"
+ * 1.2 "SELECT COUNT(*),MIN(col_a),MAX(col_b) FROM ice_tbl" -> "SELECT `CONST`,MIN(col_a),
+ * MAX(col_b) FROM ice_tbl"
+ *
+ * 2. Iceberg V2 Table
+ *
+ *     AGGREGATE
+ *     COUNT(*)
+ *         |
+ *     UNION ALL
+ *    /        \
+ *   /          \
+ *  /            \
+ * SCAN all  ANTI JOIN
+ * datafiles  /      \
+ * without   /        \
+ * deletes  SCAN      SCAN
+ *          datafiles deletes
+ *
+ *          ||
+ *        rewrite
+ *          ||
+ *          \/
+ *
+ *    ArithmeticExpr(ADD)
+ *    /             \
+ *   /               \
+ *  /                 \
+ * record_count  AGGREGATE
+ * of all        COUNT(*)
+ * datafiles         |
+ * without       ANTI JOIN
+ * deletes      /         \
+ *             /           \
+ *             SCAN        SCAN
+ *             datafiles   deletes
  */
 public enum CountStarToConstRule implements ExprRewriteRule {
 
-    INSTANCE,
-    ;
+  INSTANCE,
+  ;
 
-    @Override
-    public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
-        if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return expr;
-        if (!analyzer.canRewriteCountStarToConst()) return expr;
-        return LiteralExpr.createFromUnescapedStr(String.valueOf(
-            analyzer.getTotalRecordsNum()), Type.BIGINT);
+  @Override
+  public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
+    if (expr.isRewritten()) return expr;
+    if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return expr;
+    if (analyzer.canRewriteCountStarForV1()) {
+      return LiteralExpr.createFromUnescapedStr(String.valueOf(
+          analyzer.getTotalRecordsNumV1()), Type.BIGINT);
+    } else if (analyzer.canRewriteCountStartForV2()) {
+      expr.setRewritten(true);
+      return new ArithmeticExpr(Operator.ADD, expr, NumericLiteral.create(
+          analyzer.getTotalRecordsNumV2()));
+    } else {
+      return expr;
     }
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index 7186114ef..74315a408 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -307,6 +307,110 @@ PLAN-ROOT SINK
    HDFS partitions=1/1 files=2 size=1.22KB
    row-size=36B cardinality=4
 ====
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 7490459762454857930;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+   constant-operands=1
+   row-size=8B cardinality=1
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+00:UNION
+   constant-operands=1
+   row-size=8B cardinality=1
+====
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=20B cardinality=3
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
+|     HDFS partitions=1/1 files=1 size=2.63KB
+|     row-size=267B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
+   HDFS partitions=1/1 files=1 size=625B
+   row-size=20B cardinality=3
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=20B cardinality=3
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
+|     HDFS partitions=1/1 files=1 size=2.63KB
+|     row-size=267B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
+   HDFS partitions=1/1 files=1 size=625B
+   row-size=20B cardinality=3
+====
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=20B cardinality=6
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
+|     HDFS partitions=1/1 files=2 size=5.33KB
+|     row-size=267B cardinality=4
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
+   HDFS partitions=1/1 files=2 size=1.22KB
+   row-size=20B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|  row-size=20B cardinality=6
+|
+|--05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
+|     HDFS partitions=1/1 files=2 size=5.33KB
+|     row-size=267B cardinality=4
+|
+04:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
+   HDFS partitions=1/1 files=2 size=1.22KB
+   row-size=20B cardinality=6
+====
 SELECT * from iceberg_v2_positional_update_all_rows
 ---- PLAN
 PLAN-ROOT SINK
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
index cad21f929..5a3a19656 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
@@ -232,4 +232,4 @@ BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
 aggregation(SUM, NumFileMetadataRead): 0
-====
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
new file mode 100644
index 000000000..c84ee868e
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
@@ -0,0 +1,47 @@
+====
+---- QUERY
+select count(*) as c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files
+union all
+(select count(*) c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files_orc)
+union all
+(select -1 as c)
+union all
+(select count(*) as c from functional_parquet.iceberg_v2_no_deletes)
+union all
+(select count(*) as c from functional_parquet.iceberg_v2_no_deletes_orc) order by c;
+---- RESULTS
+-1
+3
+3
+6
+6
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumOrcStripes): 4
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+select count(*) as c from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945
+union all
+(select count(*) as c from iceberg_v2_positional_not_all_data_files_have_delete_files_orc for system_version as of 5003445199566617082)
+union all
+(select -1 as c)
+union all
+(select count(*) as c from functional_parquet.iceberg_v2_no_deletes)
+union all
+(select count(*) as c from functional_parquet.iceberg_v2_no_deletes_orc) order by c;
+---- RESULTS
+-1
+3
+3
+9
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumOrcStripes): 2
+aggregation(SUM, NumFileMetadataRead): 0
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
index e7566df64..a2af39918 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
@@ -74,11 +74,24 @@ SHOW TABLE STATS iceberg_v2_positional_delete_all_rows_orc
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
-SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc;
+SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc for system_version as of 4807054508647143162
+---- RESULTS
+3
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc
 ---- RESULTS
 0
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 2
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files_orc
@@ -113,11 +126,46 @@ SHOW TABLE STATS iceberg_v2_positional_not_all_data_files_have_delete_files_orc
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SHOW FILES IN iceberg_v2_positional_not_all_data_files_have_delete_files_orc;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-data.*.orc','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-data.*.orc','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-data.*.orc','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-data.*.orc','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-delete.*.orc','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc/data/00000-0-delete.*.orc','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc for system_version as of 8476486151350891395
+---- RESULTS
+7
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc for system_version as of 5003445199566617082
+---- RESULTS
+9
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 2
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc
 ---- RESULTS
 6
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 4
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_partitioned_position_deletes_orc
@@ -152,11 +200,24 @@ SHOW TABLE STATS iceberg_v2_partitioned_position_deletes_orc
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc for system_version as of 5416468273053855108
+---- RESULTS
+20
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc
 ---- RESULTS
 10
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 6
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_no_deletes_orc where i = 2;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index 8d491cdb4..e9c7b985f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -74,11 +74,24 @@ SHOW TABLE STATS iceberg_v2_delete_positional
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SELECT count(*) from iceberg_v2_delete_positional for system_version as of 6816997371555012807
+---- RESULTS
+3
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_delete_positional;
 ---- RESULTS
 2
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_delete_all_rows
@@ -113,11 +126,24 @@ SHOW TABLE STATS iceberg_v2_positional_delete_all_rows
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
-SELECT count(*) from iceberg_v2_positional_delete_all_rows;
+SELECT count(*) from iceberg_v2_positional_delete_all_rows for system_version as of 8593920101374128463
+---- RESULTS
+3
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_delete_all_rows
 ---- RESULTS
 0
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files
@@ -152,11 +178,46 @@ SHOW TABLE STATS iceberg_v2_positional_not_all_data_files_have_delete_files
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SHOW FILES IN iceberg_v2_positional_not_all_data_files_have_delete_files;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-delete.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files/data/00000-0-delete.*.parquet','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 7490459762454857930
+---- RESULTS
+10
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945
+---- RESULTS
+9
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
 ---- RESULTS
 6
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_update_all_rows
@@ -191,11 +252,33 @@ SHOW TABLE STATS iceberg_v2_positional_update_all_rows
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SHOW FILES IN iceberg_v2_positional_update_all_rows;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_update_all_rows/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_update_all_rows/data/00000-0-data.*.parquet','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_update_all_rows/data/00000-0-delete.*.parquet','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT count(*) from iceberg_v2_positional_update_all_rows for system_version as of 3877007445826010687
+---- RESULTS
+3
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_positional_update_all_rows
 ---- RESULTS
 3
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_partitioned_position_deletes
@@ -230,11 +313,24 @@ SHOW TABLE STATS iceberg_v2_partitioned_position_deletes
 BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
+SELECT count(*) from iceberg_v2_partitioned_position_deletes for system_version as of 2057976186205897384
+---- RESULTS
+20
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
 SELECT count(*) from iceberg_v2_partitioned_position_deletes
 ---- RESULTS
 10
 ---- TYPES
 bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 6
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_no_deletes where i = 2;
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 0ed9a6d8f..6f1938c0d 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -985,6 +985,12 @@ class TestIcebergV2Table(IcebergTestSuite):
   # The test uses pre-written Iceberg tables where the position delete files refer to
   # the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the
   # dockerised environment the namenode is accessible on a different hostname/port.
+  @SkipIfDockerizedCluster.internal_hostname
+  @SkipIf.hardcoded_uris
+  def test_plain_count_star_optimization(self, vector):
+      self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization',
+                         vector)
+
   @SkipIfDockerizedCluster.internal_hostname
   @SkipIf.hardcoded_uris
   def test_read_position_deletes(self, vector):