You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/03 01:19:14 UTC

[impala] branch master updated (a9786d341 -> a983a347a)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from a9786d341 IMPALA-11669: (addendum) Set TConfiguration in TMemoryBuffer
     new 301c3ceba IMPALA-11591: Avoid calling planFiles() on Iceberg tables
     new a983a347a IMPALA-11682: Add tests for minor compacted insert only ACID tables

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 common/thrift/CatalogObjects.thrift                |  10 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  96 +++++++------
 .../impala/catalog/IcebergContentFileStore.java    | 151 +++++++++++++++++++++
 .../impala/catalog/IcebergPositionDeleteTable.java |  18 +--
 .../org/apache/impala/catalog/IcebergTable.java    |  20 +--
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   5 +-
 .../impala/catalog/local/LocalIcebergTable.java    |  17 +--
 .../apache/impala/planner/IcebergScanPlanner.java  | 105 ++++++++++++--
 .../java/org/apache/impala/util/IcebergUtil.java   |   7 +-
 .../impala/catalog/FileMetadataLoaderTest.java     |  36 +++--
 .../impala/catalog/local/LocalCatalogTest.java     |  22 +--
 .../functional/functional_schema_template.sql      |  37 ++++-
 .../datasets/functional/schema_constraints.csv     |   3 +
 .../functional-query/queries/QueryTest/acid.test   |  25 ++++
 .../iceberg-compound-predicate-push-down.test      |  40 +++---
 .../QueryTest/iceberg-in-predicate-push-down.test  |  60 ++++----
 .../iceberg-is-null-predicate-push-down.test       |  54 ++++----
 .../iceberg-multiple-storage-locations-table.test  |   2 +-
 .../iceberg-partition-runtime-filter.test          |   8 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   4 +-
 .../iceberg-upper-lower-bound-metrics.test         |  80 +++++------
 tests/query_test/test_runtime_filters.py           |  16 ++-
 22 files changed, 581 insertions(+), 235 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java


[impala] 01/02: IMPALA-11591: Avoid calling planFiles() on Iceberg tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 301c3cebad814c73ff7f7ccfd528f7ab7832f4ab
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Sep 20 15:47:13 2022 +0200

    IMPALA-11591: Avoid calling planFiles() on Iceberg tables
    
    Iceberg's planFiles() API is very expensive as it needs to read all
    the relevant manifest files. It's especially expensive on object
    stores like S3.
    
    When there are no predicates on the table and we are not doing
    time travel it's possible to avoid calling planFiles() and do the
    scan planning from cached metadata. When none of the predicates are
    on partition columns there's little benefit of pushing down predicates
    to Iceberg. So with this patch we only push down predicates (and
    hence invoke planFiles()) when at least one of the predicates are
    on partition columns.
    
    This patch introduces a new class to store content files:
    IcebergContentFileStore. It separates data, delete, and "old" content
    files. "Old" content files are the ones that are not part of the current
    snapshot. We add such data files during time travel. Storing "old"
    content files in a separate concurrent hash map also fixes a concurrency
    bug in the current code.
    
    Testing:
     * executed current e2e tests
     * updated predicate push down tests
    
    Change-Id: Iadb883a28602bb68cf4f61e57cdd691605045ac5
    Reviewed-on: http://gerrit.cloudera.org:8080/19043
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |  10 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  96 +++++++------
 .../impala/catalog/IcebergContentFileStore.java    | 151 +++++++++++++++++++++
 .../impala/catalog/IcebergPositionDeleteTable.java |  18 +--
 .../org/apache/impala/catalog/IcebergTable.java    |  20 +--
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   5 +-
 .../impala/catalog/local/LocalIcebergTable.java    |  17 +--
 .../apache/impala/planner/IcebergScanPlanner.java  | 105 ++++++++++++--
 .../java/org/apache/impala/util/IcebergUtil.java   |   7 +-
 .../impala/catalog/local/LocalCatalogTest.java     |  22 +--
 .../iceberg-compound-predicate-push-down.test      |  40 +++---
 .../QueryTest/iceberg-in-predicate-push-down.test  |  60 ++++----
 .../iceberg-is-null-predicate-push-down.test       |  54 ++++----
 .../iceberg-multiple-storage-locations-table.test  |   2 +-
 .../iceberg-partition-runtime-filter.test          |   8 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   4 +-
 .../iceberg-upper-lower-bound-metrics.test         |  80 +++++------
 tests/query_test/test_runtime_filters.py           |  16 ++-
 18 files changed, 490 insertions(+), 225 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index a217d65dc..ef2666f02 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -581,13 +581,19 @@ struct TIcebergPartitionStats {
   3: required i64 file_size_in_bytes;
 }
 
+// Contains maps from 128-bit Murmur3 hash of file path to its file descriptor
+struct TIcebergContentFileStore {
+  1: optional map<string, THdfsFileDesc> path_hash_to_data_file
+  2: optional map<string, THdfsFileDesc> path_hash_to_delete_file
+}
+
 struct TIcebergTable {
   // Iceberg file system table location
   1: required string table_location
   2: required list<TIcebergPartitionSpec> partition_spec
   3: required i32 default_partition_spec_id
-  // Map from 128-bit Murmur3 hash of data file path to its file descriptor
-  4: optional map<string, THdfsFileDesc> path_hash_to_file_descriptor
+  // Iceberg data and delete files
+  4: optional TIcebergContentFileStore content_files
   // Snapshot id of the org.apache.iceberg.Table object cached in the CatalogD
   5: optional i64 catalog_snapshot_id;
   // Iceberg 'write.parquet.compression-codec' and 'write.parquet.compression-level' table
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 72d56ddba..0ed334a31 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -97,9 +97,9 @@ import org.slf4j.LoggerFactory;
 public interface FeIcebergTable extends FeFsTable {
   final static Logger LOG = LoggerFactory.getLogger(FeIcebergTable.class);
   /**
-   * FileDescriptor map
+   * Return content file store.
    */
-  Map<String, HdfsPartition.FileDescriptor> getPathHashToFileDescMap();
+  IcebergContentFileStore getContentFileStore();
 
   /**
    * Return the partition stats from iceberg table
@@ -324,12 +324,12 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     /**
-     * Get files info for the given fe iceberg table.
+     * Get file info for the given fe iceberg table.
      */
     public static TResultSet getIcebergTableFiles(FeIcebergTable table,
         TResultSet result) {
-      List<FileDescriptor> orderedFds = Lists
-          .newArrayList(table.getPathHashToFileDescMap().values());
+      List<FileDescriptor> orderedFds = Lists.newArrayList(
+          table.getContentFileStore().getAllFiles());
       Collections.sort(orderedFds);
       for (FileDescriptor fd : orderedFds) {
         TResultRowBuilder rowBuilder = new TResultRowBuilder();
@@ -413,7 +413,7 @@ public interface FeIcebergTable extends FeFsTable {
         rowBuilder.add("NOT CACHED");
       } else {
         long cachedBytes = 0L;
-        for (FileDescriptor fd: table.getPathHashToFileDescMap().values()) {
+        for (FileDescriptor fd: table.getContentFileStore().getAllFiles()) {
           int numBlocks = fd.getNumFileBlocks();
           for (int i = 0; i < numBlocks; ++i) {
             FbFileBlock block = fd.getFbFileBlock(i);
@@ -552,8 +552,7 @@ public interface FeIcebergTable extends FeFsTable {
       tIcebergTable.setDefault_partition_spec_id(
           icebergTable.getDefaultPartitionSpecId());
 
-      tIcebergTable.setPath_hash_to_file_descriptor(
-          convertPathHashToFileDescMap(icebergTable));
+      tIcebergTable.setContent_files(icebergTable.getContentFileStore().toThrift());
 
       tIcebergTable.setCatalog_snapshot_id(icebergTable.snapshotId());
       tIcebergTable.setParquet_compression_codec(
@@ -568,16 +567,6 @@ public interface FeIcebergTable extends FeFsTable {
       return tIcebergTable;
     }
 
-    public static Map<String, THdfsFileDesc> convertPathHashToFileDescMap(
-        FeIcebergTable icebergTable) {
-      Map<String, THdfsFileDesc> ret = new HashMap<>();
-      for (Map.Entry<String, HdfsPartition.FileDescriptor> entry :
-          icebergTable.getPathHashToFileDescMap().entrySet()) {
-        ret.put(entry.getKey(), entry.getValue().toThrift());
-      }
-      return ret;
-    }
-
     /**
      * Load the file descriptors from the thrift-encoded 'tFileDescMap'. Optionally
      * translate the file descriptors with the given 'networkAddresses'/'hostIndex'.
@@ -653,7 +642,7 @@ public interface FeIcebergTable extends FeFsTable {
      * Iceberg. Both the HdfsBaseDir and the DataFile path can contain the scheme in their
      * path, using org.apache.hadoop.fs.Path to normalize the paths.
      */
-    public static Map<String, HdfsPartition.FileDescriptor> loadAllPartition(
+    public static IcebergContentFileStore loadAllPartition(
         IcebergTable table) throws IOException, TableLoadingException {
       Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new HashMap<>();
       Collection<HdfsPartition> partitions =
@@ -664,33 +653,54 @@ public interface FeIcebergTable extends FeFsTable {
             hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
         }
       }
-      Map<String, HdfsPartition.FileDescriptor> fileDescMap = new HashMap<>();
+      IcebergContentFileStore fileStore = new IcebergContentFileStore();
       Pair<List<DataFile>, Set<DeleteFile>> allFiles = IcebergUtil.getIcebergFiles(
           table, new ArrayList<>(), /*timeTravelSpecl=*/null);
-      for (ContentFile contentFile : Iterables.concat(allFiles.first,
-                                                      allFiles.second)) {
-          Path path = new Path(contentFile.path().toString());
-          if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
-            String pathHash = IcebergUtil.getFilePathHash(contentFile);
-            HdfsPartition.FileDescriptor fsFd = hdfsFileDescMap.get(
-                path.toUri().getPath());
-            HdfsPartition.FileDescriptor iceFd = fsFd.cloneWithFileMetadata(
-                IcebergUtil.createIcebergMetadata(table, contentFile));
-            fileDescMap.put(pathHash, iceFd);
-          } else {
-            if (Utils.requiresDataFilesInTableLocation(table)) {
-              LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
-               + "file listing results.", path.toString());
-            }
-            HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
-                new Path(contentFile.path().toString()),
-                new Path(table.getIcebergTableLocation()), table);
-            HdfsPartition.FileDescriptor iceFd = fileDesc.cloneWithFileMetadata(
-                IcebergUtil.createIcebergMetadata(table, contentFile));
-            fileDescMap.put(IcebergUtil.getFilePathHash(contentFile), iceFd);
-          }
+      for (ContentFile contentFile : Iterables.concat(allFiles.first, allFiles.second)) {
+        addContentFileToFileStore(contentFile, fileStore, table, hdfsFileDescMap);
+      }
+      return fileStore;
+    }
+
+    private static void addContentFileToFileStore(ContentFile contentFile,
+        IcebergContentFileStore fileStore, IcebergTable table,
+        Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws IOException {
+      String pathHash = IcebergUtil.getFilePathHash(contentFile);
+      HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd(
+          table, hdfsFileDescMap, contentFile);
+      if (contentFile.content().equals(FileContent.DATA)) {
+        fileStore.addDataFileDescriptor(pathHash, fd);
+      } else {
+        Preconditions.checkState(
+            contentFile.content().equals(FileContent.EQUALITY_DELETES) ||
+            contentFile.content().equals(FileContent.POSITION_DELETES));
+        fileStore.addDeleteFileDescriptor(pathHash, fd);
+      }
+    }
+
+    private static FileDescriptor getOrCreateIcebergFd(IcebergTable table,
+        Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap,
+        ContentFile contentFile) throws IllegalArgumentException, IOException {
+      Path path = new Path(contentFile.path().toString());
+      HdfsPartition.FileDescriptor iceFd = null;
+      if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
+        HdfsPartition.FileDescriptor fsFd = hdfsFileDescMap.get(
+            path.toUri().getPath());
+        iceFd = fsFd.cloneWithFileMetadata(
+            IcebergUtil.createIcebergMetadata(table, contentFile));
+        return iceFd;
+      } else {
+        if (Utils.requiresDataFilesInTableLocation(table)) {
+          LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
+           + "file listing results.", path.toString());
+        }
+        HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
+            new Path(contentFile.path().toString()),
+            new Path(table.getIcebergTableLocation()), table);
+        iceFd = fileDesc.cloneWithFileMetadata(
+            IcebergUtil.createIcebergMetadata(table, contentFile));
+        return iceFd;
       }
-      return fileDescMap;
     }
 
     /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
new file mode 100644
index 000000000..5e84f3227
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.curator.shaded.com.google.common.base.Preconditions;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TIcebergContentFileStore;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+
+
+/**
+ * Helper class for storing Iceberg file descriptors. It stores data and delete files
+ * separately, while also storing file descriptors belonging to earlier snapshots.
+ */
+public class IcebergContentFileStore {
+  // Key is the DataFile path hash, value is FileDescriptor transformed from DataFile
+  private final Map<String, FileDescriptor> dataFileDescMap_ = new HashMap<>();
+  private final Map<String, FileDescriptor> deleteFileDescMap_ = new HashMap<>();
+
+  // List of Iceberg data files (doesn't include delete files)
+  private final List<FileDescriptor> dataFiles_ = new ArrayList<>();
+
+  // List of Iceberg delete files (equality and position delete files)
+  private final List<FileDescriptor> deleteFiles_ = new ArrayList<>();
+
+  // Caches file descriptors loaded during time-travel queries.
+  private final ConcurrentMap<String, FileDescriptor> oldFileDescMap_ =
+      new ConcurrentHashMap<>();
+
+  public IcebergContentFileStore() {}
+
+  public void addDataFileDescriptor(String pathHash, FileDescriptor desc) {
+    if (dataFileDescMap_.put(pathHash, desc) == null) {
+      dataFiles_.add(desc);
+    }
+  }
+
+  public void addDeleteFileDescriptor(String pathHash, FileDescriptor desc) {
+    if (deleteFileDescMap_.put(pathHash, desc) == null) {
+      deleteFiles_.add(desc);
+    }
+  }
+
+  public void addOldFileDescriptor(String pathHash, FileDescriptor desc) {
+    oldFileDescMap_.put(pathHash, desc);
+  }
+
+  public FileDescriptor getDataFileDescriptor(String pathHash) {
+    return dataFileDescMap_.get(pathHash);
+  }
+
+  public FileDescriptor getDeleteFileDescriptor(String pathHash) {
+    return deleteFileDescMap_.get(pathHash);
+  }
+
+  public FileDescriptor getOldFileDescriptor(String pathHash) {
+    return oldFileDescMap_.get(pathHash);
+  }
+
+  public FileDescriptor getFileDescriptor(String pathHash) {
+    FileDescriptor desc = null;
+    desc = dataFileDescMap_.get(pathHash);
+    if (desc != null) return desc;
+    desc = deleteFileDescMap_.get(pathHash);
+    if (desc != null) return desc;
+    desc = oldFileDescMap_.get(pathHash);
+    return desc;
+  }
+
+  public List<FileDescriptor> getDataFiles() { return dataFiles_; }
+
+  public List<FileDescriptor> getDeleteFiles() { return deleteFiles_; }
+
+  public Iterable<FileDescriptor> getAllFiles() {
+    return Iterables.concat(dataFiles_, deleteFiles_);
+  }
+
+  public TIcebergContentFileStore toThrift() {
+    TIcebergContentFileStore ret = new TIcebergContentFileStore();
+    ret.setPath_hash_to_data_file(convertFileMapToThrift(dataFileDescMap_));
+    ret.setPath_hash_to_delete_file(convertFileMapToThrift(deleteFileDescMap_));
+    return ret;
+  }
+
+  public static IcebergContentFileStore fromThrift(TIcebergContentFileStore tFileStore,
+      List<TNetworkAddress> networkAddresses,
+      ListMap<TNetworkAddress> hostIndex) {
+    IcebergContentFileStore ret = new IcebergContentFileStore();
+    if (tFileStore.isSetPath_hash_to_data_file()) {
+      convertFileMapFromThrift(tFileStore.getPath_hash_to_data_file(),
+          ret.dataFileDescMap_, ret.dataFiles_, networkAddresses, hostIndex);
+    }
+    if (tFileStore.isSetPath_hash_to_delete_file()) {
+      convertFileMapFromThrift(tFileStore.getPath_hash_to_delete_file(),
+          ret.deleteFileDescMap_, ret.deleteFiles_, networkAddresses, hostIndex);
+    }
+    return ret;
+  }
+
+  private static Map<String, THdfsFileDesc> convertFileMapToThrift(
+      Map<String, FileDescriptor> fileDescMap) {
+    Map<String, THdfsFileDesc> ret = new HashMap<>();
+    for (Map.Entry<String, HdfsPartition.FileDescriptor> entry : fileDescMap.entrySet()) {
+      ret.put(entry.getKey(), entry.getValue().toThrift());
+    }
+    return ret;
+  }
+
+  private static void convertFileMapFromThrift(Map<String, THdfsFileDesc> thriftMap,
+      Map<String, FileDescriptor> outMap, List<FileDescriptor> outList,
+      List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) {
+    Preconditions.checkNotNull(outMap);
+    Preconditions.checkNotNull(outList);
+    for (Map.Entry<String, THdfsFileDesc> entry : thriftMap.entrySet()) {
+      FileDescriptor fd = FileDescriptor.fromThrift(entry.getValue());
+      Preconditions.checkNotNull(fd);
+      if (networkAddresses != null) {
+        Preconditions.checkNotNull(hostIndex);
+        fd = fd.cloneWithNewHostIndex(networkAddresses, hostIndex);
+      }
+      outMap.put(entry.getKey(), fd);
+      outList.add(fd);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
index da917bc74..89f1da79d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
@@ -17,33 +17,21 @@
 
 package org.apache.impala.catalog;
 
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.iceberg.Table;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.analysis.IcebergPartitionSpec;
-import org.apache.impala.analysis.TableName;
-import org.apache.impala.thrift.CatalogObjectsConstants;
-import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumnStats;
 import org.apache.impala.thrift.TCompressionCodec;
-import org.apache.impala.thrift.THdfsPartition;
-import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionStats;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
-import org.apache.impala.thrift.TTableType;
-import org.apache.impala.util.AcidUtils;
-import org.apache.impala.util.IcebergUtil;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Iceberg position delete table is created on the fly during planning. It belongs to an
@@ -115,8 +103,8 @@ public class IcebergPositionDeleteTable extends VirtualTable implements FeIceber
   }
 
   @Override
-  public Map<String, FileDescriptor> getPathHashToFileDescMap() {
-    return baseTable_.getPathHashToFileDescMap();
+  public IcebergContentFileStore getContentFileStore() {
+    throw new NotImplementedException("This should never be called.");
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b10449d23..2f1a81174 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -163,8 +163,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
   // last item of the list is the latest.
   private int defaultPartitionSpecId_;
 
-  // Key is the DataFile path hash, value is FileDescriptor transformed from DataFile
-  private Map<String, FileDescriptor> pathHashToFileDescMap_;
+  // File descriptor store of all data and delete files.
+  private IcebergContentFileStore fileStore_;
 
   // Treat iceberg table as a non-partitioned hdfs table in backend
   private HdfsTable hdfsTable_;
@@ -305,11 +305,6 @@ public class IcebergTable extends Table implements FeIcebergTable {
     return defaultPartitionSpecId_;
   }
 
-  @Override
-  public Map<String, FileDescriptor> getPathHashToFileDescMap() {
-    return pathHashToFileDescMap_;
-  }
-
   @Override
   public long snapshotId() {
     return catalogSnapshotId_;
@@ -362,7 +357,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
         icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl);
         hdfsTable_
             .load(false, msClient, msTable_, true, true, false, null, null,null, reason);
-        pathHashToFileDescMap_ = Utils.loadAllPartition(this);
+        fileStore_ = Utils.loadAllPartition(this);
         partitionStats_ = Utils.loadPartitionStats(this);
         loadAllColumnStats(msClient);
       } catch (Exception e) {
@@ -446,8 +441,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
     // The Iceberg API table needs to be available and cached even when loaded through
     // thrift.
     icebergApiTable_ = IcebergUtil.loadTable(this);
-    pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
-        ticeberg.getPath_hash_to_file_descriptor(), null, null);
+    fileStore_ = IcebergContentFileStore.fromThrift(
+        ticeberg.getContent_files(), null, null);
     hdfsTable_.loadFromThrift(thriftTable);
     partitionStats_ = ticeberg.getPartition_stats();
   }
@@ -515,4 +510,9 @@ public class IcebergTable extends Table implements FeIcebergTable {
     }
     return resp;
   }
+
+  @Override
+  public IcebergContentFileStore getContentFileStore() {
+    return fileStore_;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index fe98ce486..8732e6fb6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -46,6 +46,7 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.IcebergStructField;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -183,8 +184,8 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
   }
 
   @Override
-  public Map<String, FileDescriptor> getPathHashToFileDescMap() {
-    return Collections.<String, FileDescriptor>emptyMap();
+  public IcebergContentFileStore getContentFileStore() {
+    return new IcebergContentFileStore();
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 0853b1396..dc1c11ef5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.thrift.TCompressionCodec;
@@ -64,7 +65,7 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   private long icebergParquetDictPageSize_;
   private List<IcebergPartitionSpec> partitionSpecs_;
   private int defaultPartitionSpecId_;
-  private Map<String, FileDescriptor> pathHashToFileDescMap_;
+  private IcebergContentFileStore fileStore_;
   private LocalFsTable localFsTable_;
 
   // The snapshot id of the current snapshot stored in the CatalogD.
@@ -112,8 +113,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     Preconditions.checkNotNull(tableInfo);
     localFsTable_ = LocalFsTable.load(db, msTable, ref);
     tableParams_ = tableParams;
-    pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
-        tableInfo.getIceberg_table().getPath_hash_to_file_descriptor(),
+    fileStore_ = IcebergContentFileStore.fromThrift(
+        tableInfo.getIceberg_table().getContent_files(),
         tableInfo.getNetwork_addresses(),
         getHostIndex());
     icebergApiTable_ = icebergApiTable;
@@ -197,11 +198,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     return Utils.getDefaultPartitionSpec(this);
   }
 
-  @Override
-  public Map<String, FileDescriptor> getPathHashToFileDescMap() {
-    return pathHashToFileDescMap_;
-  }
-
   @Override
   public org.apache.iceberg.Table getIcebergApiTable() {
     return icebergApiTable_;
@@ -286,4 +282,9 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
       return icebergCatalogLocation_;
     }
   }
+
+  @Override
+  public IcebergContentFileStore getContentFileStore() {
+    return fileStore_;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index e5989a451..087e3234c 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -22,10 +22,11 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
@@ -50,21 +51,22 @@ import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.Column;
-import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.DateLiteral;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TimeTravelSpec;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.BinaryPredicate.Operator;
 import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
@@ -87,6 +89,12 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Scanning Iceberg tables is not as simple as scanning legacy Hive tables. The
+ * complexity comes from handling delete files and time travel. We also want to push
+ * down Impala conjuncts to Iceberg to filter out partitions and data files. This
+ * class deals with such complexities.
+ */
 public class IcebergScanPlanner {
   private final static Logger LOG = LoggerFactory.getLogger(IcebergScanPlanner.class);
 
@@ -122,8 +130,12 @@ public class IcebergScanPlanner {
   }
 
   public PlanNode createIcebergScanPlan() throws ImpalaException {
-    FeIcebergTable iceTbl = getIceTable();
     analyzer_.materializeSlots(conjuncts_);
+
+    if (!needIcebergForPlanning()) {
+      return planWithoutIceberg();
+    }
+
     filterFileDescriptors();
 
     PlanNode ret;
@@ -140,6 +152,31 @@ public class IcebergScanPlanner {
     return ret;
   }
 
+  /**
+   * We can avoid calling Iceberg's expensive planFiles() API when the followings are
+   * true:
+   *  - we don't push down predicates
+   *  - no time travel
+   *  - no delete files
+   * TODO: we should still avoid calling planFiles() if there are delete files. To do that
+   * we either need to track which delete files have corresponding data files so we
+   * can create the UNION ALL node. Or, if we have an optimized, Iceberg-specific
+   * ANTI-JOIN operator, then it wouldn't hurt too much to transfer all rows through it.
+   */
+  private boolean needIcebergForPlanning() {
+    return
+        !icebergPredicates_.isEmpty() ||
+        tblRef_.getTimeTravelSpec() != null ||
+        !getIceTable().getContentFileStore().getDeleteFiles().isEmpty();
+  }
+
+  private PlanNode planWithoutIceberg() throws ImpalaException {
+    PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
+        aggInfo_, getIceTable().getContentFileStore().getDataFiles());
+    ret.init(analyzer_);
+    return ret;
+  }
+
   private PlanNode createComplexIcebergScanPlan() throws ImpalaException {
     PlanNode joinNode = createPositionJoinNode();
     if (dataFilesWithoutDeletes_.isEmpty()) {
@@ -338,14 +375,23 @@ public class IcebergScanPlanner {
   private Pair<FileDescriptor, Boolean> getFileDescriptor(ContentFile cf)
       throws ImpalaRuntimeException {
     boolean cachehit = true;
-    FileDescriptor fileDesc = getIceTable().getPathHashToFileDescMap().get(
-        IcebergUtil.getFilePathHash(cf));
+    String pathHash = IcebergUtil.getFilePathHash(cf);
+    IcebergContentFileStore fileStore = getIceTable().getContentFileStore();
+    FileDescriptor fileDesc = cf.content() == FileContent.DATA ?
+        fileStore.getDataFileDescriptor(pathHash) :
+        fileStore.getDeleteFileDescriptor(pathHash);
+
     if (fileDesc == null) {
       if (tblRef_.getTimeTravelSpec() == null) {
         // We should always find the data files in the cache when not doing time travel.
         throw new ImpalaRuntimeException("Cannot find file in cache: " + cf.path()
             + " with snapshot id: " + String.valueOf(getIceTable().snapshotId()));
       }
+      // We can still find the file descriptor among the old file descriptors.
+      fileDesc = fileStore.getOldFileDescriptor(pathHash);
+      if (fileDesc != null) {
+        return new Pair<>(fileDesc, true);
+      }
       cachehit = false;
       try {
         fileDesc = FeIcebergTable.Utils.getFileDescriptor(
@@ -363,8 +409,7 @@ public class IcebergScanPlanner {
       // Add file descriptor to the cache.
       fileDesc = fileDesc.cloneWithFileMetadata(
           IcebergUtil.createIcebergMetadata(getIceTable(), cf));
-      getIceTable().getPathHashToFileDescMap().put(
-          IcebergUtil.getFilePathHash(cf), fileDesc);
+      fileStore.addOldFileDescriptor(pathHash, fileDesc);
     }
     return new Pair<>(fileDesc, cachehit);
   }
@@ -373,7 +418,10 @@ public class IcebergScanPlanner {
    * Extracts predicates from conjuncts_ that can be pushed down to Iceberg.
    *
    * Since Iceberg will filter data files by metadata instead of scan data files,
-   * we pushdown all predicates to Iceberg to get the minimum data files to scan.
+   * if any of the predicates refer to an Iceberg partitioning column
+   * we pushdown all predicates to Iceberg to get the minimum data files to scan. If none
+   * of the predicates refer to a partition column then we don't pushdown predicates
+   * since this way we avoid calling the expensive 'planFiles()' API call.
    * Here are three cases for predicate pushdown:
    * 1.The column is not part of any Iceberg partition expression
    * 2.The column is part of all partition keys without any transformation (i.e. IDENTITY)
@@ -384,11 +432,50 @@ public class IcebergScanPlanner {
    * please refer: https://iceberg.apache.org/spec/#scan-planning
    */
   private void extractIcebergConjuncts() throws ImpalaException {
+    boolean isPartitionColumnIncluded = false;
+    Map<SlotId, SlotDescriptor> idToSlotDesc = new HashMap<>();
+    for (SlotDescriptor slotDesc : tblRef_.getDesc().getSlots()) {
+      idToSlotDesc.put(slotDesc.getId(), slotDesc);
+    }
+    for (Expr expr : conjuncts_) {
+      if (isPartitionColumnIncluded(expr, idToSlotDesc)) {
+        isPartitionColumnIncluded = true;
+        break;
+      }
+    }
+    if (!isPartitionColumnIncluded) {
+      return;
+    }
     for (Expr expr : conjuncts_) {
       tryConvertIcebergPredicate(expr);
     }
   }
 
+  private boolean isPartitionColumnIncluded(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    List<TupleId> tupleIds = Lists.newArrayList();
+    List<SlotId> slotIds = Lists.newArrayList();
+    expr.getIds(tupleIds, slotIds);
+
+    if (tupleIds.size() > 1) return false;
+    if (!tupleIds.get(0).equals(tblRef_.getDesc().getId())) return false;
+
+    for (SlotId sId : slotIds) {
+      SlotDescriptor slotDesc = idToSlotDesc.get(sId);
+      if (slotDesc == null) continue;
+      Column col = slotDesc.getColumn();
+      if (col == null) continue;
+      Preconditions.checkState(col instanceof IcebergColumn);
+      IcebergColumn iceCol = (IcebergColumn)col;
+      if (IcebergUtil.isPartitionColumn(iceCol,
+          getIceTable().getDefaultPartitionSpec())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   /**
    * Returns Iceberg operator by BinaryPredicate operator, or null if the operation
    * is not supported by Iceberg.
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index f3185c4bb..b7d8645d7 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -596,8 +596,12 @@ public class IcebergUtil {
    * Use ContentFile path to generate 128-bit Murmur3 hash as map key, cached in memory
    */
   public static String getFilePathHash(ContentFile contentFile) {
+    return getFilePathHash(contentFile.path().toString());
+  }
+
+  public static String getFilePathHash(String path) {
     Hasher hasher = Hashing.murmur3_128().newHasher();
-    hasher.putUnencodedChars(contentFile.path().toString());
+    hasher.putUnencodedChars(path);
     return hasher.hash().toString();
   }
 
@@ -1009,6 +1013,7 @@ public class IcebergUtil {
 
   public static boolean isPartitionColumn(IcebergColumn column,
       IcebergPartitionSpec spec) {
+    if (!spec.hasPartitionFields()) return false;
     for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
       if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
       if (column.getFieldId() != partField.getSourceId()) continue;
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 63f79ea79..2c577447d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -20,7 +20,6 @@ package org.apache.impala.catalog.local;
 import static org.junit.Assert.*;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
@@ -36,9 +35,9 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.fb.FbFileBlock;
@@ -52,6 +51,7 @@ import org.apache.impala.thrift.TMetadataOpcode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.PatternMatcher;
@@ -278,18 +278,20 @@ public class LocalCatalogTest {
   public void testLoadIcebergFileDescriptors() throws Exception {
     LocalIcebergTable t = (LocalIcebergTable)catalog_.getTable(
         "functional_parquet", "iceberg_partitioned");
-    Map<String, FileDescriptor> localTblFdMap = t.getPathHashToFileDescMap();
+    IcebergContentFileStore fileStore = t.getContentFileStore();
     TPartialTableInfo tblInfo = provider_.loadIcebergTable(t.ref_);
     ListMap<TNetworkAddress> catalogdHostIndexes = new ListMap<>();
     catalogdHostIndexes.populate(tblInfo.getNetwork_addresses());
-    Map<String, FileDescriptor> catalogFdMap =
-        FeIcebergTable.Utils.loadFileDescMapFromThrift(
-            tblInfo.getIceberg_table().getPath_hash_to_file_descriptor(),
+    IcebergContentFileStore catalogFileStore = IcebergContentFileStore.fromThrift(
+            tblInfo.getIceberg_table().getContent_files(),
             null, null);
-    for (Map.Entry<String, FileDescriptor> entry : localTblFdMap.entrySet()) {
-      String path = entry.getKey();
-      FileDescriptor localFd = entry.getValue();
-      FileDescriptor catalogFd = catalogFdMap.get(path);
+    for (FileDescriptor localFd : fileStore.getDataFiles()) {
+      String path = localFd.getAbsolutePath(t.getLocation());
+      // For this test table the manifest files contain data paths without FS-scheme, so
+      // they are loaded to the file content store without them.
+      path = path.substring(path.indexOf("/test-warehouse"));
+      String pathHash = IcebergUtil.getFilePathHash(path);
+      FileDescriptor catalogFd = catalogFileStore.getDataFileDescriptor(pathHash);
       assertEquals(localFd.getNumFileBlocks(), 1);
       FbFileBlock localBlock = localFd.getFbFileBlock(0);
       FbFileBlock catalogBlock = catalogFd.getFbFileBlock(0);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
index 433d51601..585d22d57 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
@@ -72,7 +72,7 @@ where
   and (
     col_ts in ('1500-01-01 00:00:00')
     or col_db = 2.71823
-  );
+  ) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -91,7 +91,7 @@ where
     col_db = 2.71823
     or col_ts in ('1500-01-01 00:00:00')
   )
-  and col_str in ('1700-01-01 00:00:00');
+  and col_str in ('1700-01-01 00:00:00') and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -113,7 +113,7 @@ where
   and (
     col_ts is not null
     or col_db = 2.71823
-  );
+  ) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -134,7 +134,7 @@ where
   and (
     col_db = 2.71823
     or col_ts in ('1500-01-01 00:00:00')
-  );
+  ) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -152,7 +152,7 @@ where
   and (
     col_ts in ('1500-01-01 00:00:00')
     or cast(col_db as double) = 2.71823
-  );
+  ) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -169,7 +169,7 @@ where
   and (
     cast(col_db as bigint) = 2.71823
     or col_ts in ('1500-01-01 00:00:00')
-  );
+  ) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -183,8 +183,8 @@ select
 from
   ice_compound_pred_pd
 where
-  col_bi = 12345678902
-  or col_db in (2.71823);
+  (col_bi = 12345678902
+  or col_db in (2.71823)) and col_i >= 0;
 ---- RESULTS
 6
 ---- RUNTIME_PROFILE
@@ -198,11 +198,11 @@ select
 from
   ice_compound_pred_pd
 where
-  col_bi in (12345678902)
+  (col_bi in (12345678902)
   or (
     col_ts in ('1500-01-01 00:00:00')
     and col_db = 2.71823
-  );
+  )) and col_i >= 0;
 ---- RESULTS
 6
 ---- RUNTIME_PROFILE
@@ -216,11 +216,11 @@ select
 from
   ice_compound_pred_pd
 where
-  (
+  ((
     col_db = 2.71823
     and col_ts in ('1500-01-01 00:00:00')
   )
-  or col_bi in (12345678902);
+  or col_bi in (12345678902)) and col_i >= 0;
 ---- RESULTS
 6
 ---- RUNTIME_PROFILE
@@ -237,6 +237,7 @@ where
   (
     col_db = 1.71823 + 1
     and col_ts in ('1500-01-01 00:00:00')
+    and col_i >= 0
   )
   or (
     col_bi in (12345678902)
@@ -257,6 +258,7 @@ where
   (
     col_db = 2.71823
     and cast(col_ts as string) in ('1500-01-01 00:00:00')
+    and col_i >= 0
   )
   or (
     col_bi in (12345678902)
@@ -281,6 +283,7 @@ where
   or (
     col_bi in (12345678902)
     and col_str is not null
+    and col_i >= 0
   );
 ---- RESULTS
 0
@@ -295,7 +298,7 @@ select
 from
   ice_compound_pred_pd
 where
-  not col_bi is not null;
+  not col_bi is not null and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -310,7 +313,8 @@ from
   ice_compound_pred_pd
 where
   not col_bi is not null
-  or col_bi = 12345678908;
+  or col_bi = 12345678908
+  or col_i = -1;
 ---- RESULTS
 6
 ---- RUNTIME_PROFILE
@@ -324,7 +328,7 @@ select
 from
   ice_compound_pred_pd
 where
-  not (col_bi != 12345678901);
+  not (col_bi != 12345678901) or col_i = -1;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -341,6 +345,7 @@ where
   not (
     col_bi != 123
     or col_db = 2.71822
+    or col_i = -1
   );
 ---- RESULTS
 0
@@ -357,6 +362,7 @@ where
   not (
     col_bi + 1 != 123
     or col_db = 2.71822
+    or col_i = -1
   );
 ---- RESULTS
 0
@@ -373,7 +379,8 @@ from
 where
   col_bi is null
   or col_bi between 12345678901
-  and 12345678902;
+  and 12345678902
+  and col_i >= 0;
 ---- RESULTS
 9
 ---- RUNTIME_PROFILE
@@ -446,6 +453,7 @@ where
   or not(
     col_bi_2 < 22
     or col_bi_2 > 22
+    or col_i = -1
   )
   or col_bi_3 is null
   or col_bi_4 = 44
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
index 319165fb2..14bcf645e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -183,7 +183,7 @@ select
 from
   ice_pred_pd1
 where
-  col_bi in (12345678900, 12345678903, 12345678906);
+  col_bi in (12345678900, 12345678903, 12345678906) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -196,7 +196,7 @@ select
 from
   ice_pred_pd1
 where
-  col_bi in (12345678900);
+  col_bi in (12345678900) and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -209,7 +209,7 @@ select
 from
   ice_pred_pd1
 where
-  col_bi in (12345678909);
+  col_bi in (12345678909) and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -223,7 +223,7 @@ select
 from
   ice_pred_pd1
 where
-  col_f in (3.140, 3.143, 3.146);
+  col_f in (3.140, 3.143, 3.146) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -236,7 +236,7 @@ select
 from
   ice_pred_pd1
 where
-  col_f in (3.140);
+  col_f in (3.140) and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -249,7 +249,7 @@ select
 from
   ice_pred_pd1
 where
-  col_f in (3.149);
+  col_f in (3.149) and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -263,7 +263,7 @@ select
 from
   ice_pred_pd1
 where
-  col_db in (2.71820, 2.71823, 2.71826);
+  col_db in (2.71820, 2.71823, 2.71826) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -276,7 +276,7 @@ select
 from
   ice_pred_pd1
 where
-  col_db in (2.71820);
+  col_db in (2.71820) and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -289,7 +289,7 @@ select
 from
   ice_pred_pd1
 where
-  col_db in (2.71829);
+  col_db in (2.71829) and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -303,7 +303,7 @@ select
 from
   ice_pred_pd1
 where
-  col_str in ('Leonhard_A', 'Leonhard_D', 'Leonhard_G');
+  col_str in ('Leonhard_A', 'Leonhard_D', 'Leonhard_G') and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -316,7 +316,7 @@ select
 from
   ice_pred_pd1
 where
-  col_str in ('Leonhard_A');
+  col_str in ('Leonhard_A') and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -329,7 +329,7 @@ select
 from
   ice_pred_pd1
 where
-  col_str in ('Leonhard_J');
+  col_str in ('Leonhard_J') and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -343,7 +343,7 @@ select
 from
   ice_pred_pd1
 where
-  col_ts in ('1400-01-01 00:00:00', '1500-01-01 00:00:00', '1600-01-01 00:00:00');
+  col_ts in ('1400-01-01 00:00:00', '1500-01-01 00:00:00', '1600-01-01 00:00:00') and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -356,7 +356,7 @@ select
 from
   ice_pred_pd1
 where
-  col_ts in ('1400-01-01 00:00:00');
+  col_ts in ('1400-01-01 00:00:00') and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -369,7 +369,7 @@ select
 from
   ice_pred_pd1
 where
-  col_ts in ('1700-01-01 00:00:00');
+  col_ts in ('1700-01-01 00:00:00') and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -383,7 +383,7 @@ select
 from
   ice_pred_pd1
 where
-  col_dt in (DATE'1400-01-01', DATE'1500-01-01', DATE'1600-01-01');
+  col_dt in (DATE'1400-01-01', DATE'1500-01-01', DATE'1600-01-01') and col_i >= 0;
 ---- RESULTS
 9
 ---- RUNTIME_PROFILE
@@ -396,7 +396,7 @@ select
 from
   ice_pred_pd1
 where
-  col_dt in (DATE'1400-01-01');
+  col_dt in (DATE'1400-01-01') and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -409,7 +409,7 @@ select
 from
   ice_pred_pd1
 where
-  col_dt in (DATE'1700-01-01');
+  col_dt in (DATE'1700-01-01') and col_i >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -422,7 +422,7 @@ select
 from
   ice_pred_pd1
 where
-  col_i not in (0, 1);
+  col_i not in (0, 1) and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -435,7 +435,7 @@ select
 from
   ice_pred_pd1
 where
-  col_dt not in (DATE'1400-01-01', DATE'1500-01-01');
+  col_dt not in (DATE'1400-01-01', DATE'1500-01-01') and col_i >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -536,7 +536,7 @@ select
 from
   ice_pred_pd2
 where
-  d1 in (123.450, 123.453, 123.456);
+  d1 in (123.450, 123.453, 123.456) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -549,7 +549,7 @@ select
 from
   ice_pred_pd2
 where
-  d1 in (123.450, 123.451, 123.452);
+  d1 in (123.450, 123.451, 123.452) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -562,7 +562,7 @@ select
 from
   ice_pred_pd2
 where
-  d1 in (123.459);
+  d1 in (123.459) and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -576,7 +576,7 @@ select
 from
   ice_pred_pd2
 where
-  d2 in (1234567890.120, 1234567890.123, 1234567890.126);
+  d2 in (1234567890.120, 1234567890.123, 1234567890.126) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -589,7 +589,7 @@ select
 from
   ice_pred_pd2
 where
-  d2 in (1234567890.120, 1234567890.121, 1234567890.122);
+  d2 in (1234567890.120, 1234567890.121, 1234567890.122) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -602,7 +602,7 @@ select
 from
   ice_pred_pd2
 where
-  d2 in (1234567890.129);
+  d2 in (1234567890.129) and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -616,7 +616,7 @@ select
 from
   ice_pred_pd2
 where
-  d3 in (1234567890123456789.010, 1234567890123456789.013, 1234567890123456789.016);
+  d3 in (1234567890123456789.010, 1234567890123456789.013, 1234567890123456789.016) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -629,7 +629,7 @@ select
 from
   ice_pred_pd2
 where
-  d3 in (1234567890123456789.010, 1234567890123456789.011, 1234567890123456789.012);
+  d3 in (1234567890123456789.010, 1234567890123456789.011, 1234567890123456789.012) and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -642,9 +642,9 @@ select
 from
   ice_pred_pd2
 where
-  d3 in (1234567890123456789.019);
+  d3 in (1234567890123456789.019) and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-====
\ No newline at end of file
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
index 2ec3111f3..ea79fbe29 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
@@ -60,7 +60,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_bi is null;
+  col_bi is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -73,7 +73,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_bi as double) is null;
+  cast(col_bi as double) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -86,7 +86,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_bi as TIMESTAMP) is null;
+  cast(col_bi as TIMESTAMP) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -99,7 +99,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_bi + 1 is null;
+  col_bi + 1 is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -112,7 +112,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_bi is not null;
+  col_bi is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -126,7 +126,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_db is null;
+  col_db is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -139,7 +139,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_db as decimal(6,5)) is null;
+  cast(col_db as decimal(6,5)) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -152,7 +152,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_db is not null;
+  col_db is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -166,7 +166,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_str is null;
+  col_str is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -179,7 +179,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_str as TIMESTAMP) is null
+  cast(col_str as TIMESTAMP) is null and col_i >= 0
 order by 1;
 ---- RESULTS
 0
@@ -195,7 +195,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_str is not null;
+  col_str is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -209,7 +209,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_ts is null;
+  col_ts is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -222,7 +222,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_ts as string) is null;
+  cast(col_ts as string) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -235,7 +235,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_ts is not null;
+  col_ts is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -249,7 +249,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dt is null;
+  col_dt is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -262,7 +262,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_dt as string) is null;
+  cast(col_dt as string) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -275,7 +275,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dt is not null;
+  col_dt is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -289,7 +289,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc1 is null;
+  col_dc1 is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -302,7 +302,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_dc1 as double) is null;
+  cast(col_dc1 as double) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -315,7 +315,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc1 is not null;
+  col_dc1 is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -329,7 +329,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc2 is null;
+  col_dc2 is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -342,7 +342,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_dc2 as double) is null;
+  cast(col_dc2 as double) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -355,7 +355,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc2 is not null;
+  col_dc2 is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
@@ -369,7 +369,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc3 is null;
+  col_dc3 is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -382,7 +382,7 @@ select
 from
   ice_is_null_pred_pd
 where
-  cast(col_dc3 as string) is null;
+  cast(col_dc3 as string) is null and col_i >= 0;
 ---- RESULTS
 1
 ---- RUNTIME_PROFILE
@@ -395,9 +395,9 @@ select
 from
   ice_is_null_pred_pd
 where
-  col_dc3 is not null;
+  col_dc3 is not null and col_i >= 0;
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 8
-====
\ No newline at end of file
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-storage-locations-table.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-storage-locations-table.test
index 82bd854a4..a4721d9e3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-storage-locations-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-storage-locations-table.test
@@ -60,7 +60,7 @@ select
 from
   functional_parquet.iceberg_multiple_storage_locations
 where
-  col_string = "a"
+  col_string = "a" and col_int >= 0
 order by
   col_int;
 ---- RESULTS
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-runtime-filter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-runtime-filter.test
index fb408cbf5..ed46adc08 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-runtime-filter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-runtime-filter.test
@@ -95,7 +95,7 @@ aggregation(SUM, RowGroups processed): 5
 aggregation(SUM, RowGroups rejected): 0
 aggregation(SUM, Rows processed): 3
 aggregation(SUM, Rows rejected): 1
-aggregation(SUM, NumRowGroups): 7
+aggregation(SUM, NumRowGroups): 12
 aggregation(SUM, NumDictFilteredRowGroups): 4
 ====
 ---- QUERY
@@ -129,7 +129,7 @@ aggregation(SUM, RowGroups processed): 3
 aggregation(SUM, RowGroups rejected): 0
 aggregation(SUM, Rows processed): 2
 aggregation(SUM, Rows rejected): 0
-aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumRowGroups): 6
 aggregation(SUM, NumDictFilteredRowGroups): 2
 ====
 ---- QUERY
@@ -151,6 +151,6 @@ aggregation(SUM, RowGroups processed): 1
 aggregation(SUM, RowGroups rejected): 0
 aggregation(SUM, Rows processed): 4
 aggregation(SUM, Rows rejected): 0
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 4
 aggregation(SUM, NumDictFilteredRowGroups): 0
-====
\ No newline at end of file
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index ee7355170..48be24881 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -319,7 +319,7 @@ BIGINT
 aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
-# 'timestamp_col' is not a partitioning column, but min/max stats will be used to
+# 'timestamp_col' is not a partitioning column, so min/max stats will not be used to
 # eliminate row groups
 select count(*) from alltypes_part
 where timestamp_col = now();
@@ -328,7 +328,7 @@ where timestamp_col = now();
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumRowGroups): 8
 ====
 ---- QUERY
 create table alltypes_part_2 like alltypes_part;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
index 78a4219d1..d4f6ba007 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -34,7 +34,7 @@ STRING, STRING, STRING
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types1
-where s >= 'z';
+where s >= 'z' and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -44,7 +44,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types1
-where s >= 'b' and s <= 'cz';
+where s >= 'b' and s <= 'cz' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -54,7 +54,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types1
-where s >= 'b' and s <= 'dz';
+where s >= 'b' and s <= 'dz' and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -64,7 +64,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 'fermium' is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types1
-where s = 'fermium';
+where s = 'fermium' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -74,7 +74,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types1
-where i >= 10;
+where i >= 10 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -84,7 +84,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types1
-where i >= 2 and i <= 3;
+where i >= 2 and i <= 3 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -94,7 +94,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types1
-where i >= 2 and i <= 4;
+where i >= 2 and i <= 4 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -104,7 +104,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 6 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types1
-where i = 6;
+where i = 6 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -114,7 +114,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types1
-where bi >= 10;
+where bi >= 10 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -124,7 +124,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types1
-where bi >= 2 and bi <= 3;
+where bi >= 2 and bi <= 3 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -134,7 +134,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types1
-where bi >= 2 and bi <= 4;
+where bi >= 2 and bi <= 4 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -144,7 +144,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 6 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types1
-where bi = 6;
+where bi = 6 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -154,7 +154,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types1
-where f >= 5.0;
+where f >= 5.0 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -164,7 +164,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types1
-where f >= 0.13 and f <= 3.451;
+where f >= 0.13 and f <= 3.451 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -174,7 +174,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types1
-where f >= 0.13 and f <= 3.57;
+where f >= 0.13 and f <= 3.57 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -184,7 +184,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 4.01 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types1
-where f = 4.01;
+where f = 4.01 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -194,7 +194,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types1
-where db >= 5.0;
+where db >= 5.0 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -204,7 +204,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types1
-where db >= 0.13 and db <= 3.451;
+where db >= 0.13 and db <= 3.451 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -214,7 +214,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types1
-where db >= 0.13 and db <= 3.57;
+where db >= 0.13 and db <= 3.57 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -224,7 +224,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 4.01 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types1
-where db = 4.01;
+where db = 4.01 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -266,7 +266,7 @@ STRING, STRING, STRING
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types2
-where dt >= DATE'9999-12-31';
+where dt >= DATE'9999-12-31' and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -276,7 +276,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types2
-where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-01';
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-01' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -286,7 +286,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types2
-where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-02';
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-02' and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -296,7 +296,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 1999-12-31 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types2
-where dt = DATE'1999-12-31';
+where dt = DATE'1999-12-31' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -306,7 +306,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types2
-where ts >= '9999-12-31 03:04:06';
+where ts >= '9999-12-31 03:04:06' and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -316,7 +316,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types2
-where dt >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:00';
+where dt >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:00' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -326,7 +326,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types2
-where ts >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:01';
+where ts >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:01' and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -336,7 +336,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # '1999-12-31 23:59:59' is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types2
-where ts = '1999-12-31 23:59:59';
+where ts = '1999-12-31 23:59:59' and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -384,7 +384,7 @@ STRING, STRING, STRING
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types3
-where d1 >= 123456.791;
+where d1 >= 123456.791 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -394,7 +394,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types3
-where d1 >= 123.457 and d1 <= 123.458;
+where d1 >= 123.457 and d1 <= 123.458 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -404,7 +404,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types3
-where d1 >= 123.457 and d1 <= 123.459;
+where d1 >= 123.457 and d1 <= 123.459 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -414,7 +414,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 341.234 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types3
-where d1 = 341.234;
+where d1 = 341.234 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -424,7 +424,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types3
-where d2 >= 123456789012345.680;
+where d2 >= 123456789012345.680 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -434,7 +434,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types3
-where d2 >= 1234567890.124 and d2 <= 1234567890.125;
+where d2 >= 1234567890.124 and d2 <= 1234567890.125 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -444,7 +444,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types3
-where d2 >= 1234567890.124 and d2 <= 1234567890.126;
+where d2 >= 1234567890.124 and d2 <= 1234567890.126 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -454,7 +454,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 3412345678.901 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types3
-where d2 = 3412345678.901;
+where d2 = 3412345678.901 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -464,7 +464,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # Lower/upper bounds metrics eliminate all row groups
 select count(*) from ice_types3
-where d3 >= 12345678901234567890123456789012345.681;
+where d3 >= 12345678901234567890123456789012345.681 and p >= 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -474,7 +474,7 @@ aggregation(SUM, RowsRead): 0
 ---- QUERY
 # Where condition matches one row group's metrics
 select count(*) from ice_types3
-where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.014;
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.014 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
@@ -484,7 +484,7 @@ aggregation(SUM, RowsRead): 3
 ---- QUERY
 # Where condition spans over 2 row groups
 select count(*) from ice_types3
-where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.015;
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.015 and p >= 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -494,7 +494,7 @@ aggregation(SUM, RowsRead): 6
 ---- QUERY
 # 3412345678901234567.89 is the upper bound of one row group and the lower bound of another
 select count(*) from ice_types3
-where d3 = 3412345678901234567.89;
+where d3 = 3412345678901234567.89 and p >= 0;
 ---- RESULTS
 2
 ---- RUNTIME_PROFILE
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index c285a1335..2e91c1cf9 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -186,22 +186,28 @@ class TestBloomFilters(ImpalaTestSuite):
       add_exec_option_dimension(cls, "async_codegen", 1)
 
   def test_bloom_filters(self, vector):
-    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
+    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/bloom_filters', vector)
 
   def test_iceberg_dictionary_runtime_filter(self, vector, unique_database):
-    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
+    if (vector.get_value('table_format').file_format != 'parquet'):
+      pytest.skip()
+    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/iceberg-dictionary-runtime-filter', vector,
       unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
   def test_parquet_dictionary_runtime_filter(self, vector, unique_database):
-    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
-    self.execute_query("SET PARQUET_READ_STATISTICS=false;")
+    if (vector.get_value('table_format').file_format != 'parquet'):
+      pytest.skip()
+    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
+    vector.get_value('exec_option')['PARQUET_READ_STATISTICS'] = 'false'
     self.run_test_case('QueryTest/parquet-dictionary-runtime-filter', vector,
       unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
   def test_iceberg_partition_runtime_filter(self, vector, unique_database):
-    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
+    if (vector.get_value('table_format').file_format != 'parquet'):
+      pytest.skip()
+    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/iceberg-partition-runtime-filter', vector,
       unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 


[impala] 02/02: IMPALA-11682: Add tests for minor compacted insert only ACID tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a983a347a77af74e1a9bd6156d12a020d6b4df6d
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Mon Oct 24 20:15:50 2022 +0200

    IMPALA-11682: Add tests for minor compacted insert only ACID tables
    
    Only test changes. Minor compacted delta dirs are supported in
    Impala since IMPALA-9512, but at that time Hive supported minor
    compaction only on full ACID tables. Since that time Hive added
    support for minor compacting insert only/MM tables (HIVE-22610).
    
    Change-Id: I7159283f3658f2119d38bd3393729535edd0a76f
    Reviewed-on: http://gerrit.cloudera.org:8080/19164
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/FileMetadataLoaderTest.java     | 36 +++++++++++++++------
 .../functional/functional_schema_template.sql      | 37 +++++++++++++++++++++-
 .../datasets/functional/schema_constraints.csv     |  3 ++
 .../functional-query/queries/QueryTest/acid.test   | 25 +++++++++++++++
 4 files changed, 91 insertions(+), 10 deletions(-)

diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index a92595464..b0ad5f768 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -146,22 +146,40 @@ public class FileMetadataLoaderTest extends FrontendTestBase {
         relPaths.get(0));
   }
 
-  @Test
-  public void testAcidMinorCompactionLoading() throws IOException, CatalogException {
-    //TODO(IMPALA-9042): Remove "throws CatalogException"
+  private FileMetadataLoader getLoaderForAcidTable(
+      String validWriteIdString, String path, HdfsFileFormat format)
+      throws IOException, CatalogException {
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
-    ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
-        "functional_orc_def.complextypestbl_minor_compacted:10:10::");
-    Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/managed/" +
-                              "functional_orc_def.db/" +
-                              "complextypestbl_minor_compacted_orc_def/");
+    ValidWriteIdList writeIds =
+        MetastoreShim.getValidWriteIdListFromString(validWriteIdString);
+    Path tablePath = new Path(path);
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
         /* oldFds = */ Collections.emptyList(), hostIndex, new ValidReadTxnList(""),
-        writeIds, HdfsFileFormat.ORC);
+        writeIds, format);
     fml.load();
+    return fml;
+  }
+
+  @Test
+  public void testAcidMinorCompactionLoading() throws IOException, CatalogException {
+    //TODO(IMPALA-9042): Remove "throws CatalogException"
+    FileMetadataLoader fml = getLoaderForAcidTable(
+        "functional_orc_def.complextypestbl_minor_compacted:10:10::",
+        "hdfs://localhost:20500/test-warehouse/managed/functional_orc_def.db/" +
+            "complextypestbl_minor_compacted_orc_def/",
+        HdfsFileFormat.ORC);
     // Only load the compacted file.
     assertEquals(1, fml.getStats().loadedFiles);
     assertEquals(8, fml.getStats().filesSupersededByAcidState);
+
+    fml = getLoaderForAcidTable(
+        "functional_parquet.insert_only_minor_compacted:6:6::",
+        "hdfs://localhost:20500/test-warehouse/managed/functional_parquet.db/" +
+            "insert_only_minor_compacted_parquet/",
+        HdfsFileFormat.PARQUET);
+    // Only load files after compaction.
+    assertEquals(3, fml.getStats().loadedFiles);
+    assertEquals(2, fml.getStats().filesSupersededByAcidState);
   }
 
   @Test
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 4f1ff5cd0..d2a0941bd 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3741,5 +3741,40 @@ values (
   map(cast("key1" as binary), 1, cast("key2" as binary), 2),
   map(1, cast("value1" as binary), 2, cast("value2" as binary)),
   named_struct("i", 0, "b", cast("member" as binary))
-  )
+  );
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+insert_only_minor_compacted
+---- COLUMNS
+id bigint
+---- DEPENDENT_LOAD_HIVE
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (1);
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (2);
+ALTER TABLE {db_name}{db_suffix}.{table_name} compact 'minor' AND WAIT;
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (3);
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (4);
+---- TABLE_PROPERTIES
+transactional=true
+transactional_properties=insert_only
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+insert_only_major_and_minor_compacted
+---- COLUMNS
+id bigint
+---- DEPENDENT_LOAD_HIVE
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (1);
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (2);
+ALTER TABLE {db_name}{db_suffix}.{table_name} compact 'major' AND WAIT;
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (3);
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (4);
+ALTER TABLE {db_name}{db_suffix}.{table_name} compact 'minor' AND WAIT;
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (5);
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (6);
+---- TABLE_PROPERTIES
+transactional=true
+transactional_properties=insert_only
 ====
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 7b3a265be..55e0d17d1 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -352,3 +352,6 @@ table_name:complextypes_maps_view, constraint:restrict_to, table_format:orc/def/
 
 # 'alltypestiny_negative' only used in ORC tests.
 table_name:alltypestiny_negative, constraint:restrict_to, table_format:orc/def/block
+
+table_name:insert_only_minor_compacted, constraint:restrict_to, table_format:parquet/none/none
+table_name:insert_only_major_and_minor_compacted, constraint:restrict_to, table_format:parquet/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
index 1b27ee404..1553c9f3c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -136,3 +136,28 @@ from functional_orc_def.complextypestbl_minor_compacted;
 ---- TYPES
 BIGINT,BIGINT
 ====
+---- QUERY
+# Test that Impala sees the compacted delta dir in a minor-compacted insert-only table.
+show files in functional_parquet.insert_only_minor_compacted;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_minor_compacted_parquet/delta_0000001_0000002_v\d+/000000_0','.+B',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_minor_compacted_parquet/delta_0000003_0000003_0000/000000_0','.+B',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_minor_compacted_parquet/delta_0000004_0000004_0000/000000_0','.+B',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test that Impala sees the compacted delta dir in a first major then minor-compacted insert-only table.
+show files in functional_parquet.insert_only_major_and_minor_compacted;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_major_and_minor_compacted_parquet/base_0000002_v\d+/000000_0','.+B',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_major_and_minor_compacted_parquet/delta_0000003_0000004_v\d+/000000_0','.+B',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_major_and_minor_compacted_parquet/delta_0000005_0000005_0000/000000_0','.+B',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_parquet.db/insert_only_major_and_minor_compacted_parquet/delta_0000006_0000006_0000/000000_0','.+B',''
+---- TYPES
+STRING,STRING,STRING
+====