You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/07 18:31:30 UTC

[iceberg] branch master updated: Core: Refactor PartitionsTable planning (#7190)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 60032f65ac Core: Refactor PartitionsTable planning (#7190)
60032f65ac is described below

commit 60032f65aca07c0361eb5d77ccc6984b93095748
Author: Hongyue/Steve Zhang <st...@gmail.com>
AuthorDate: Fri Apr 7 11:31:23 2023 -0700

    Core: Refactor PartitionsTable planning (#7190)
---
 .../src/main/java/org/apache/iceberg/BaseScan.java |   2 +-
 .../java/org/apache/iceberg/PartitionsTable.java   |  98 +++++++-------
 .../apache/iceberg/MetadataTableScanTestBase.java  |  19 +--
 .../org/apache/iceberg/TestMetadataTableScans.java | 145 +++++++++++----------
 ...stMetadataTableScansWithPartitionEvolution.java |  20 +--
 5 files changed, 145 insertions(+), 139 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java
index a1c92c927a..4f605985cf 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -36,7 +36,7 @@ import org.apache.iceberg.util.PropertyUtil;
 abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
     implements Scan<ThisT, T, G> {
 
-  private static final List<String> SCAN_COLUMNS =
+  protected static final List<String> SCAN_COLUMNS =
       ImmutableList.of(
           "snapshot_id",
           "file_path",
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 5a289d756c..97c82c432f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -23,17 +23,15 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.util.Map;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
 
 /** A {@link Table} implementation that exposes a table's partitions as rows. */
 public class PartitionsTable extends BaseMetadataTable {
 
   private final Schema schema;
-  static final boolean PLAN_SCANS_WITH_WORKER_POOL =
-      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
 
   PartitionsTable(Table table) {
     this(table, table.name() + ".partitions");
@@ -96,7 +94,6 @@ public class PartitionsTable extends BaseMetadataTable {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
@@ -104,22 +101,66 @@ public class PartitionsTable extends BaseMetadataTable {
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       int[] normalizedPositions =
           normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
+              dataFile.specId(),
               specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    .select(BaseScan.SCAN_COLUMNS)); // don't select stats columns
+
+    return new ParallelIterable<>(tasks, scan.planExecutor());
+  }
+
   /**
-   * Builds an array of the field position of positions in the normalized partition type indexed by
-   * field position in the original partition type
+   * Builds an integer array for a specific partition type to map its partitions to the final
+   * normalized type.
+   *
+   * <p>The array represents fields in the original partition type, with the index being the field's
+   * index in the original partition type, and the value being the field's index in the normalized
+   * partition type.
+   *
+   * @param table iceberg table
+   * @param specId spec id where original partition type is written
+   * @param normalizedType normalized partition type
    */
   private static int[] normalizedPositions(
       Table table, int specId, Types.StructType normalizedType) {
@@ -156,43 +197,6 @@ public class PartitionsTable extends BaseMetadataTable {
     return normalizedPartition;
   }
 
-  @VisibleForTesting
-  static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
-    Table table = scan.table();
-    Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId());
-    boolean caseSensitive = scan.isCaseSensitive();
-
-    LoadingCache<Integer, ManifestEvaluator> evalCache =
-        Caffeine.newBuilder()
-            .build(
-                specId -> {
-                  PartitionSpec spec = table.specs().get(specId);
-                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
-                  return ManifestEvaluator.forRowFilter(
-                      scan.filter(), transformedSpec, caseSensitive);
-                });
-
-    FileIO io = table.io();
-    ManifestGroup manifestGroup =
-        new ManifestGroup(io, snapshot.dataManifests(io), snapshot.deleteManifests(io))
-            .caseSensitive(caseSensitive)
-            .filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m))
-            .select(scan.scanColumns())
-            .specsById(scan.table().specs())
-            .ignoreDeleted();
-
-    if (scan.shouldIgnoreResiduals()) {
-      manifestGroup = manifestGroup.ignoreResiduals();
-    }
-
-    if (scan.snapshot().dataManifests(io).size() > 1
-        && (PLAN_SCANS_WITH_WORKER_POOL || scan.context().planWithCustomizedExecutor())) {
-      manifestGroup = manifestGroup.planWith(scan.context().planExecutor());
-    }
-
-    return manifestGroup.planFiles();
-  }
-
   private class PartitionsScan extends StaticTableScan {
     PartitionsScan(Table table) {
       super(
diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index ea7b2cfc95..47b6065e30 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -79,24 +80,24 @@ public abstract class MetadataTableScanTestBase extends TableTestBase {
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
+  protected void validateSingleFieldPartition(
+      CloseableIterable<DataFile> dataFiles, int partitionValue) {
+    validatePartition(dataFiles, 0, partitionValue);
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(
+      CloseableIterable<DataFile> dataFiles, int position, int partitionValue) {
     Assert.assertTrue(
         "File scan tasks do not include correct file",
-        StreamSupport.stream(tasks.spliterator(), false)
+        StreamSupport.stream(dataFiles.spliterator(), false)
             .anyMatch(
-                task -> {
-                  StructLike partition = task.file().partition();
+                dataFile -> {
+                  StructLike partition = dataFile.partition();
                   if (position >= partition.size()) {
                     return false;
                   }
 
-                  return partition.get(position, Object.class).equals(partValue);
+                  return Objects.equals(partitionValue, partition.get(position, Object.class));
                 }));
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index c45da54f03..1cfc994cda 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -314,16 +314,16 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
                     "partition",
                     Types.StructType.of(optional(1000, "data_bucket", Types.IntegerType.get()))))
             .asStruct();
-
     TableScan scanNoFilter = partitionsTable.newScan().select("partition.data_bucket");
     Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
-    CloseableIterable<FileScanTask> tasksNoFilter =
-        PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
-    Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
-    validateIncludesPartitionScan(tasksNoFilter, 0);
-    validateIncludesPartitionScan(tasksNoFilter, 1);
-    validateIncludesPartitionScan(tasksNoFilter, 2);
-    validateIncludesPartitionScan(tasksNoFilter, 3);
+
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
+    validateSingleFieldPartition(dataFiles, 1);
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -337,13 +337,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
 
     TableScan scanWithProjection = partitionsTable.newScan().select("file_count");
     Assert.assertEquals(expected, scanWithProjection.schema().asStruct());
-    CloseableIterable<FileScanTask> tasksWithProjection =
-        PartitionsTable.planFiles((StaticTableScan) scanWithProjection);
-    Assert.assertEquals(4, Iterators.size(tasksWithProjection.iterator()));
-    validateIncludesPartitionScan(tasksWithProjection, 0);
-    validateIncludesPartitionScan(tasksWithProjection, 1);
-    validateIncludesPartitionScan(tasksWithProjection, 2);
-    validateIncludesPartitionScan(tasksWithProjection, 3);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanWithProjection);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
+    validateSingleFieldPartition(dataFiles, 1);
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -351,14 +351,14 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
     table.newFastAppend().appendFile(FILE_WITH_STATS).commit();
 
     Table partitionsTable = new PartitionsTable(table);
-    CloseableIterable<FileScanTask> tasksAndEq =
-        PartitionsTable.planFiles((StaticTableScan) partitionsTable.newScan());
-    for (FileScanTask fileTask : tasksAndEq) {
-      Assert.assertNull(fileTask.file().columnSizes());
-      Assert.assertNull(fileTask.file().valueCounts());
-      Assert.assertNull(fileTask.file().nullValueCounts());
-      Assert.assertNull(fileTask.file().lowerBounds());
-      Assert.assertNull(fileTask.file().upperBounds());
+    CloseableIterable<DataFile> tasksAndEq =
+        PartitionsTable.planDataFiles((StaticTableScan) partitionsTable.newScan());
+    for (DataFile dataFile : tasksAndEq) {
+      Assert.assertNull(dataFile.columnSizes());
+      Assert.assertNull(dataFile.valueCounts());
+      Assert.assertNull(dataFile.nullValueCounts());
+      Assert.assertNull(dataFile.lowerBounds());
+      Assert.assertNull(dataFile.upperBounds());
     }
   }
 
@@ -373,10 +373,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
-    CloseableIterable<FileScanTask> tasksAndEq =
-        PartitionsTable.planFiles((StaticTableScan) scanAndEq);
-    Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
-    validateIncludesPartitionScan(tasksAndEq, 0);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
   }
 
   @Test
@@ -390,11 +390,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
             Expressions.lessThan("partition.data_bucket", 2),
             Expressions.greaterThan("record_count", 0));
     TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
-    CloseableIterable<FileScanTask> tasksLtAnd =
-        PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
-    Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
-    validateIncludesPartitionScan(tasksLtAnd, 0);
-    validateIncludesPartitionScan(tasksLtAnd, 1);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanLtAnd);
+    Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
+    validateSingleFieldPartition(dataFiles, 1);
   }
 
   @Test
@@ -408,12 +408,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 2),
             Expressions.greaterThan("record_count", 0));
     TableScan scanOr = partitionsTable.newScan().filter(or);
-    CloseableIterable<FileScanTask> tasksOr = PartitionsTable.planFiles((StaticTableScan) scanOr);
-    Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
-    validateIncludesPartitionScan(tasksOr, 0);
-    validateIncludesPartitionScan(tasksOr, 1);
-    validateIncludesPartitionScan(tasksOr, 2);
-    validateIncludesPartitionScan(tasksOr, 3);
+
+    CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scanOr);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
+    validateSingleFieldPartition(dataFiles, 1);
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -423,10 +424,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
 
     Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
     TableScan scanNot = partitionsTable.newScan().filter(not);
-    CloseableIterable<FileScanTask> tasksNot = PartitionsTable.planFiles((StaticTableScan) scanNot);
-    Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
-    validateIncludesPartitionScan(tasksNot, 2);
-    validateIncludesPartitionScan(tasksNot, 3);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanNot);
+    Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -437,10 +439,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
 
     Expression set = Expressions.in("partition.data_bucket", 2, 3);
     TableScan scanSet = partitionsTable.newScan().filter(set);
-    CloseableIterable<FileScanTask> tasksSet = PartitionsTable.planFiles((StaticTableScan) scanSet);
-    Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
-    validateIncludesPartitionScan(tasksSet, 2);
-    validateIncludesPartitionScan(tasksSet, 3);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanSet);
+    Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -451,13 +454,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
 
     Expression unary = Expressions.notNull("partition.data_bucket");
     TableScan scanUnary = partitionsTable.newScan().filter(unary);
-    CloseableIterable<FileScanTask> tasksUnary =
-        PartitionsTable.planFiles((StaticTableScan) scanUnary);
-    Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
-    validateIncludesPartitionScan(tasksUnary, 0);
-    validateIncludesPartitionScan(tasksUnary, 1);
-    validateIncludesPartitionScan(tasksUnary, 2);
-    validateIncludesPartitionScan(tasksUnary, 3);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanUnary);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
+    validateSingleFieldPartition(dataFiles, 1);
+    validateSingleFieldPartition(dataFiles, 2);
+    validateSingleFieldPartition(dataFiles, 3);
   }
 
   @Test
@@ -660,20 +663,18 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
         Expressions.and(
             Expressions.equal("partition.id", 10), Expressions.greaterThan("record_count", 0));
     TableScan scan = metadataTable.newScan().filter(filter);
-    CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
-
+    CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
     // Four data files of old spec, one new data file of new spec
-    Assert.assertEquals(5, Iterables.size(tasks));
-
+    Assert.assertEquals(5, Iterables.size(dataFiles));
     filter =
         Expressions.and(
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     scan = metadataTable.newScan().filter(filter);
-    tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+    dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
 
     // 1 original data file written by old spec, plus 1 new data file written by new spec
-    Assert.assertEquals(2, Iterables.size(tasks));
+    Assert.assertEquals(2, Iterables.size(dataFiles));
   }
 
   @Test
@@ -714,10 +715,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
         Expressions.and(
             Expressions.equal("partition.id", 10), Expressions.greaterThan("record_count", 0));
     TableScan scan = metadataTable.newScan().filter(filter);
-    CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+    CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
 
     // Four original files of original spec, one data file written by new spec
-    Assert.assertEquals(5, Iterables.size(tasks));
+    Assert.assertEquals(5, Iterables.size(dataFiles));
 
     // Filter for a dropped partition spec field.  Correct behavior is that only old partitions are
     // returned.
@@ -726,11 +727,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     scan = metadataTable.newScan().filter(filter);
-    tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+    dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
 
     if (formatVersion == 1) {
       // 1 original data file written by old spec
-      Assert.assertEquals(1, Iterables.size(tasks));
+      Assert.assertEquals(1, Iterables.size(dataFiles));
     } else {
       // 1 original data/delete files written by old spec, plus both of new data file/delete file
       // written by new spec
@@ -741,11 +742,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
       //
       // However, these partition rows are filtered out later in Spark data filtering, as the newer
       // partitions
-      // will have 'data=null' field added as part of normalization to the Partitions table final
+      // will have 'data=null' field added as part of normalization to the Partition table final
       // schema.
-      // The Partitions table final schema is a union of fields of all specs, including dropped
+      // The Partition table final schema is a union of fields of all specs, including dropped
       // fields.
-      Assert.assertEquals(3, Iterables.size(tasks));
+      Assert.assertEquals(3, Iterables.size(dataFiles));
     }
   }
 
@@ -796,10 +797,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
             Expressions.equal("partition.partition", 0),
             Expressions.greaterThan("record_count", 0));
     TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
-    CloseableIterable<FileScanTask> tasksAndEq =
-        PartitionsTable.planFiles((StaticTableScan) scanAndEq);
-    Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
-    validateIncludesPartitionScan(tasksAndEq, 0);
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(dataFiles.iterator()));
+    validateSingleFieldPartition(dataFiles, 0);
   }
 
   @Test
@@ -867,8 +868,8 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
                           true); // daemon threads will be terminated abruptly when the JVM exits
                       return thread;
                     }));
-    CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
-    Assert.assertEquals(4, Iterables.size(tasks));
+    CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
     Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index b3aa5f40e0..1f9aaa9075 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -29,6 +29,7 @@ import java.util.stream.Stream;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Types;
@@ -151,16 +152,15 @@ public class TestMetadataTableScansWithPartitionEvolution extends MetadataTableS
 
     TableScan scanNoFilter = partitionsTable.newScan().select("partition");
     Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
-    try (CloseableIterable<FileScanTask> tasksNoFilter =
-        PartitionsTable.planFiles((StaticTableScan) scanNoFilter)) {
-      Assertions.assertThat(tasksNoFilter).hasSize(4);
-      validateIncludesPartitionScan(tasksNoFilter, 0);
-      validateIncludesPartitionScan(tasksNoFilter, 1);
-      validateIncludesPartitionScan(tasksNoFilter, 2);
-      validateIncludesPartitionScan(tasksNoFilter, 3);
-      validateIncludesPartitionScan(tasksNoFilter, 1, 2);
-      validateIncludesPartitionScan(tasksNoFilter, 1, 3);
-    }
+    CloseableIterable<DataFile> dataFiles =
+        PartitionsTable.planDataFiles((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+    validatePartition(dataFiles, 0, 0);
+    validatePartition(dataFiles, 0, 1);
+    validatePartition(dataFiles, 0, 2);
+    validatePartition(dataFiles, 0, 3);
+    validatePartition(dataFiles, 1, 2);
+    validatePartition(dataFiles, 1, 3);
   }
 
   @Test