You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/07 18:31:30 UTC
[iceberg] branch master updated: Core: Refactor PartitionsTable planning (#7190)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 60032f65ac Core: Refactor PartitionsTable planning (#7190)
60032f65ac is described below
commit 60032f65aca07c0361eb5d77ccc6984b93095748
Author: Hongyue/Steve Zhang <st...@gmail.com>
AuthorDate: Fri Apr 7 11:31:23 2023 -0700
Core: Refactor PartitionsTable planning (#7190)
---
.../src/main/java/org/apache/iceberg/BaseScan.java | 2 +-
.../java/org/apache/iceberg/PartitionsTable.java | 98 +++++++-------
.../apache/iceberg/MetadataTableScanTestBase.java | 19 +--
.../org/apache/iceberg/TestMetadataTableScans.java | 145 +++++++++++----------
...stMetadataTableScansWithPartitionEvolution.java | 20 +--
5 files changed, 145 insertions(+), 139 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java
index a1c92c927a..4f605985cf 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -36,7 +36,7 @@ import org.apache.iceberg.util.PropertyUtil;
abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
implements Scan<ThisT, T, G> {
- private static final List<String> SCAN_COLUMNS =
+ protected static final List<String> SCAN_COLUMNS =
ImmutableList.of(
"snapshot_id",
"file_path",
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 5a289d756c..97c82c432f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -23,17 +23,15 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
/** A {@link Table} implementation that exposes a table's partitions as rows. */
public class PartitionsTable extends BaseMetadataTable {
private final Schema schema;
- static final boolean PLAN_SCANS_WITH_WORKER_POOL =
- SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
PartitionsTable(Table table) {
this(table, table.name() + ".partitions");
@@ -96,7 +94,6 @@ public class PartitionsTable extends BaseMetadataTable {
}
private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
@@ -104,22 +101,66 @@ public class PartitionsTable extends BaseMetadataTable {
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
- for (FileScanTask task : tasks) {
- PartitionData original = (PartitionData) task.file().partition();
+ CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+ for (DataFile dataFile : datafiles) {
+ PartitionData original = (PartitionData) dataFile.partition();
int[] normalizedPositions =
normalizedPositionsBySpec.computeIfAbsent(
- task.spec().specId(),
+ dataFile.specId(),
specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
PartitionData normalized =
normalizePartition(original, normalizedPartitionType, normalizedPositions);
- partitions.get(normalized).update(task.file());
+ partitions.get(normalized).update(dataFile);
}
+
return partitions.all();
}
+ @VisibleForTesting
+ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+ Table table = scan.table();
+ Snapshot snapshot = scan.snapshot();
+
+ CloseableIterable<ManifestFile> dataManifests =
+ CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
+ Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = table.specs().get(specId);
+ PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+ return ManifestEvaluator.forRowFilter(
+ scan.filter(), transformedSpec, scan.isCaseSensitive());
+ });
+
+ CloseableIterable<ManifestFile> filteredManifests =
+ CloseableIterable.filter(
+ dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ Iterable<CloseableIterable<DataFile>> tasks =
+ CloseableIterable.transform(
+ filteredManifests,
+ manifest ->
+ ManifestFiles.read(manifest, table.io(), table.specs())
+ .caseSensitive(scan.isCaseSensitive())
+ .select(BaseScan.SCAN_COLUMNS)); // don't select stats columns
+
+ return new ParallelIterable<>(tasks, scan.planExecutor());
+ }
+
/**
- * Builds an array of the field position of positions in the normalized partition type indexed by
- * field position in the original partition type
+ * Builds an integer array for a specific partition type to map its partitions to the final
+ * normalized type.
+ *
+ * <p>The array represents fields in the original partition type, with the index being the field's
+ * index in the original partition type, and the value being the field's index in the normalized
+ * partition type.
+ *
+ * @param table iceberg table
+ * @param specId spec id where original partition type is written
+ * @param normalizedType normalized partition type
*/
private static int[] normalizedPositions(
Table table, int specId, Types.StructType normalizedType) {
@@ -156,43 +197,6 @@ public class PartitionsTable extends BaseMetadataTable {
return normalizedPartition;
}
- @VisibleForTesting
- static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
- Table table = scan.table();
- Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId());
- boolean caseSensitive = scan.isCaseSensitive();
-
- LoadingCache<Integer, ManifestEvaluator> evalCache =
- Caffeine.newBuilder()
- .build(
- specId -> {
- PartitionSpec spec = table.specs().get(specId);
- PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
- return ManifestEvaluator.forRowFilter(
- scan.filter(), transformedSpec, caseSensitive);
- });
-
- FileIO io = table.io();
- ManifestGroup manifestGroup =
- new ManifestGroup(io, snapshot.dataManifests(io), snapshot.deleteManifests(io))
- .caseSensitive(caseSensitive)
- .filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m))
- .select(scan.scanColumns())
- .specsById(scan.table().specs())
- .ignoreDeleted();
-
- if (scan.shouldIgnoreResiduals()) {
- manifestGroup = manifestGroup.ignoreResiduals();
- }
-
- if (scan.snapshot().dataManifests(io).size() > 1
- && (PLAN_SCANS_WITH_WORKER_POOL || scan.context().planWithCustomizedExecutor())) {
- manifestGroup = manifestGroup.planWith(scan.context().planExecutor());
- }
-
- return manifestGroup.planFiles();
- }
-
private class PartitionsScan extends StaticTableScan {
PartitionsScan(Table table) {
super(
diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index ea7b2cfc95..47b6065e30 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -79,24 +80,24 @@ public abstract class MetadataTableScanTestBase extends TableTestBase {
}
}
- protected void validateIncludesPartitionScan(
- CloseableIterable<FileScanTask> tasks, int partValue) {
- validateIncludesPartitionScan(tasks, 0, partValue);
+ protected void validateSingleFieldPartition(
+ CloseableIterable<DataFile> dataFiles, int partitionValue) {
+ validatePartition(dataFiles, 0, partitionValue);
}
- protected void validateIncludesPartitionScan(
- CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+ protected void validatePartition(
+ CloseableIterable<DataFile> dataFiles, int position, int partitionValue) {
Assert.assertTrue(
"File scan tasks do not include correct file",
- StreamSupport.stream(tasks.spliterator(), false)
+ StreamSupport.stream(dataFiles.spliterator(), false)
.anyMatch(
- task -> {
- StructLike partition = task.file().partition();
+ dataFile -> {
+ StructLike partition = dataFile.partition();
if (position >= partition.size()) {
return false;
}
- return partition.get(position, Object.class).equals(partValue);
+ return Objects.equals(partitionValue, partition.get(position, Object.class));
}));
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index c45da54f03..1cfc994cda 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -314,16 +314,16 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
"partition",
Types.StructType.of(optional(1000, "data_bucket", Types.IntegerType.get()))))
.asStruct();
-
TableScan scanNoFilter = partitionsTable.newScan().select("partition.data_bucket");
Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
- CloseableIterable<FileScanTask> tasksNoFilter =
- PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
- Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
- validateIncludesPartitionScan(tasksNoFilter, 0);
- validateIncludesPartitionScan(tasksNoFilter, 1);
- validateIncludesPartitionScan(tasksNoFilter, 2);
- validateIncludesPartitionScan(tasksNoFilter, 3);
+
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanNoFilter);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
+ validateSingleFieldPartition(dataFiles, 1);
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -337,13 +337,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
TableScan scanWithProjection = partitionsTable.newScan().select("file_count");
Assert.assertEquals(expected, scanWithProjection.schema().asStruct());
- CloseableIterable<FileScanTask> tasksWithProjection =
- PartitionsTable.planFiles((StaticTableScan) scanWithProjection);
- Assert.assertEquals(4, Iterators.size(tasksWithProjection.iterator()));
- validateIncludesPartitionScan(tasksWithProjection, 0);
- validateIncludesPartitionScan(tasksWithProjection, 1);
- validateIncludesPartitionScan(tasksWithProjection, 2);
- validateIncludesPartitionScan(tasksWithProjection, 3);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanWithProjection);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
+ validateSingleFieldPartition(dataFiles, 1);
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -351,14 +351,14 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
table.newFastAppend().appendFile(FILE_WITH_STATS).commit();
Table partitionsTable = new PartitionsTable(table);
- CloseableIterable<FileScanTask> tasksAndEq =
- PartitionsTable.planFiles((StaticTableScan) partitionsTable.newScan());
- for (FileScanTask fileTask : tasksAndEq) {
- Assert.assertNull(fileTask.file().columnSizes());
- Assert.assertNull(fileTask.file().valueCounts());
- Assert.assertNull(fileTask.file().nullValueCounts());
- Assert.assertNull(fileTask.file().lowerBounds());
- Assert.assertNull(fileTask.file().upperBounds());
+ CloseableIterable<DataFile> tasksAndEq =
+ PartitionsTable.planDataFiles((StaticTableScan) partitionsTable.newScan());
+ for (DataFile dataFile : tasksAndEq) {
+ Assert.assertNull(dataFile.columnSizes());
+ Assert.assertNull(dataFile.valueCounts());
+ Assert.assertNull(dataFile.nullValueCounts());
+ Assert.assertNull(dataFile.lowerBounds());
+ Assert.assertNull(dataFile.upperBounds());
}
}
@@ -373,10 +373,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
- CloseableIterable<FileScanTask> tasksAndEq =
- PartitionsTable.planFiles((StaticTableScan) scanAndEq);
- Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
- validateIncludesPartitionScan(tasksAndEq, 0);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanAndEq);
+ Assert.assertEquals(1, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
}
@Test
@@ -390,11 +390,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.lessThan("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
- CloseableIterable<FileScanTask> tasksLtAnd =
- PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
- Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
- validateIncludesPartitionScan(tasksLtAnd, 0);
- validateIncludesPartitionScan(tasksLtAnd, 1);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanLtAnd);
+ Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
+ validateSingleFieldPartition(dataFiles, 1);
}
@Test
@@ -408,12 +408,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scanOr = partitionsTable.newScan().filter(or);
- CloseableIterable<FileScanTask> tasksOr = PartitionsTable.planFiles((StaticTableScan) scanOr);
- Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
- validateIncludesPartitionScan(tasksOr, 0);
- validateIncludesPartitionScan(tasksOr, 1);
- validateIncludesPartitionScan(tasksOr, 2);
- validateIncludesPartitionScan(tasksOr, 3);
+
+ CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scanOr);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
+ validateSingleFieldPartition(dataFiles, 1);
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -423,10 +424,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
TableScan scanNot = partitionsTable.newScan().filter(not);
- CloseableIterable<FileScanTask> tasksNot = PartitionsTable.planFiles((StaticTableScan) scanNot);
- Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
- validateIncludesPartitionScan(tasksNot, 2);
- validateIncludesPartitionScan(tasksNot, 3);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanNot);
+ Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -437,10 +439,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expression set = Expressions.in("partition.data_bucket", 2, 3);
TableScan scanSet = partitionsTable.newScan().filter(set);
- CloseableIterable<FileScanTask> tasksSet = PartitionsTable.planFiles((StaticTableScan) scanSet);
- Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
- validateIncludesPartitionScan(tasksSet, 2);
- validateIncludesPartitionScan(tasksSet, 3);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanSet);
+ Assert.assertEquals(2, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -451,13 +454,13 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expression unary = Expressions.notNull("partition.data_bucket");
TableScan scanUnary = partitionsTable.newScan().filter(unary);
- CloseableIterable<FileScanTask> tasksUnary =
- PartitionsTable.planFiles((StaticTableScan) scanUnary);
- Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
- validateIncludesPartitionScan(tasksUnary, 0);
- validateIncludesPartitionScan(tasksUnary, 1);
- validateIncludesPartitionScan(tasksUnary, 2);
- validateIncludesPartitionScan(tasksUnary, 3);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanUnary);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
+ validateSingleFieldPartition(dataFiles, 1);
+ validateSingleFieldPartition(dataFiles, 2);
+ validateSingleFieldPartition(dataFiles, 3);
}
@Test
@@ -660,20 +663,18 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.and(
Expressions.equal("partition.id", 10), Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
- CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
-
+ CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
// Four data files of old spec, one new data file of new spec
- Assert.assertEquals(5, Iterables.size(tasks));
-
+ Assert.assertEquals(5, Iterables.size(dataFiles));
filter =
Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
- tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+ dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
// 1 original data file written by old spec, plus 1 new data file written by new spec
- Assert.assertEquals(2, Iterables.size(tasks));
+ Assert.assertEquals(2, Iterables.size(dataFiles));
}
@Test
@@ -714,10 +715,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.and(
Expressions.equal("partition.id", 10), Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
- CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+ CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
// Four original files of original spec, one data file written by new spec
- Assert.assertEquals(5, Iterables.size(tasks));
+ Assert.assertEquals(5, Iterables.size(dataFiles));
// Filter for a dropped partition spec field. Correct behavior is that only old partitions are
// returned.
@@ -726,11 +727,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
- tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+ dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
if (formatVersion == 1) {
// 1 original data file written by old spec
- Assert.assertEquals(1, Iterables.size(tasks));
+ Assert.assertEquals(1, Iterables.size(dataFiles));
} else {
// 1 original data/delete files written by old spec, plus both of new data file/delete file
// written by new spec
@@ -741,11 +742,11 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
//
// However, these partition rows are filtered out later in Spark data filtering, as the newer
// partitions
- // will have 'data=null' field added as part of normalization to the Partitions table final
+ // will have 'data=null' field added as part of normalization to the Partition table final
// schema.
- // The Partitions table final schema is a union of fields of all specs, including dropped
+ // The Partition table final schema is a union of fields of all specs, including dropped
// fields.
- Assert.assertEquals(3, Iterables.size(tasks));
+ Assert.assertEquals(3, Iterables.size(dataFiles));
}
}
@@ -796,10 +797,10 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
Expressions.equal("partition.partition", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
- CloseableIterable<FileScanTask> tasksAndEq =
- PartitionsTable.planFiles((StaticTableScan) scanAndEq);
- Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
- validateIncludesPartitionScan(tasksAndEq, 0);
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanAndEq);
+ Assert.assertEquals(1, Iterators.size(dataFiles.iterator()));
+ validateSingleFieldPartition(dataFiles, 0);
}
@Test
@@ -867,8 +868,8 @@ public class TestMetadataTableScans extends MetadataTableScanTestBase {
true); // daemon threads will be terminated abruptly when the JVM exits
return thread;
}));
- CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);
- Assert.assertEquals(4, Iterables.size(tasks));
+ CloseableIterable<DataFile> dataFiles = PartitionsTable.planDataFiles((StaticTableScan) scan);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index b3aa5f40e0..1f9aaa9075 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -29,6 +29,7 @@ import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
@@ -151,16 +152,15 @@ public class TestMetadataTableScansWithPartitionEvolution extends MetadataTableS
TableScan scanNoFilter = partitionsTable.newScan().select("partition");
Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
- try (CloseableIterable<FileScanTask> tasksNoFilter =
- PartitionsTable.planFiles((StaticTableScan) scanNoFilter)) {
- Assertions.assertThat(tasksNoFilter).hasSize(4);
- validateIncludesPartitionScan(tasksNoFilter, 0);
- validateIncludesPartitionScan(tasksNoFilter, 1);
- validateIncludesPartitionScan(tasksNoFilter, 2);
- validateIncludesPartitionScan(tasksNoFilter, 3);
- validateIncludesPartitionScan(tasksNoFilter, 1, 2);
- validateIncludesPartitionScan(tasksNoFilter, 1, 3);
- }
+ CloseableIterable<DataFile> dataFiles =
+ PartitionsTable.planDataFiles((StaticTableScan) scanNoFilter);
+ Assert.assertEquals(4, Iterators.size(dataFiles.iterator()));
+ validatePartition(dataFiles, 0, 0);
+ validatePartition(dataFiles, 0, 1);
+ validatePartition(dataFiles, 0, 2);
+ validatePartition(dataFiles, 0, 3);
+ validatePartition(dataFiles, 1, 2);
+ validatePartition(dataFiles, 1, 3);
}
@Test