You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/04 14:37:13 UTC
[iceberg] branch master updated: Core: Simplify partition coercion code in PartitionsTable (#7503)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c751cb24b6 Core: Simplify partition coercion code in PartitionsTable (#7503)
c751cb24b6 is described below
commit c751cb24b6f59084f473395745a4778f64aaca75
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Thu May 4 07:37:06 2023 -0700
Core: Simplify partition coercion code in PartitionsTable (#7503)
---
.../java/org/apache/iceberg/PartitionsTable.java | 107 +++++++--------------
.../org/apache/iceberg/util/PartitionUtil.java | 2 +-
2 files changed, 37 insertions(+), 72 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 97c82c432f..6c22a6dbf4 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -20,13 +20,13 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import java.util.Map;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.StructLikeMap;
/** A {@link Table} implementation that exposes a table's partitions as rows. */
public class PartitionsTable extends BaseMetadataTable {
@@ -90,28 +90,22 @@ public class PartitionsTable extends BaseMetadataTable {
private static StaticDataTask.Row convertPartition(Partition partition) {
return StaticDataTask.Row.of(
- partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
+ partition.partitionData,
+ partition.specId,
+ partition.dataRecordCount,
+ partition.dataFileCount);
}
private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
- Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
- PartitionMap partitions = new PartitionMap();
-
- // cache a position map needed by each partition spec to normalize partitions to final schema
- Map<Integer, int[]> normalizedPositionsBySpec =
- Maps.newHashMapWithExpectedSize(table.specs().size());
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ PartitionMap partitions = new PartitionMap(partitionType);
CloseableIterable<DataFile> datafiles = planDataFiles(scan);
for (DataFile dataFile : datafiles) {
- PartitionData original = (PartitionData) dataFile.partition();
- int[] normalizedPositions =
- normalizedPositionsBySpec.computeIfAbsent(
- dataFile.specId(),
- specId -> normalizedPositions(table, specId, normalizedPartitionType));
-
- PartitionData normalized =
- normalizePartition(original, normalizedPartitionType, normalizedPositions);
- partitions.get(normalized).update(dataFile);
+ StructLike partition =
+ PartitionUtil.coercePartition(
+ partitionType, table.specs().get(dataFile.specId()), dataFile.partition());
+ partitions.get(partition).update(dataFile);
}
return partitions.all();
@@ -150,53 +144,6 @@ public class PartitionsTable extends BaseMetadataTable {
return new ParallelIterable<>(tasks, scan.planExecutor());
}
- /**
- * Builds an integer array for a specific partition type to map its partitions to the final
- * normalized type.
- *
- * <p>The array represents fields in the original partition type, with the index being the field's
- * index in the original partition type, and the value being the field's index in the normalized
- * partition type.
- *
- * @param table iceberg table
- * @param specId spec id where original partition type is written
- * @param normalizedType normalized partition type
- */
- private static int[] normalizedPositions(
- Table table, int specId, Types.StructType normalizedType) {
- Types.StructType originalType = table.specs().get(specId).partitionType();
- int[] normalizedPositions = new int[originalType.fields().size()];
- for (int originalIndex = 0; originalIndex < originalType.fields().size(); originalIndex++) {
- Types.NestedField normalizedField =
- normalizedType.field(originalType.fields().get(originalIndex).fieldId());
- normalizedPositions[originalIndex] = normalizedType.fields().indexOf(normalizedField);
- }
- return normalizedPositions;
- }
-
- /**
- * Convert a partition data written by an old spec, to table's normalized partition type, which is
- * a common partition type for all specs of the table.
- *
- * @param originalPartition un-normalized partition data
- * @param normalizedPartitionType table's normalized partition type {@link
- * Partitioning#partitionType(Table)}
- * @param normalizedPositions field positions in the normalized partition type indexed by field
- * position in the original partition type
- * @return the normalized partition data
- */
- private static PartitionData normalizePartition(
- PartitionData originalPartition,
- Types.StructType normalizedPartitionType,
- int[] normalizedPositions) {
- PartitionData normalizedPartition = new PartitionData(normalizedPartitionType);
- for (int originalIndex = 0; originalIndex < originalPartition.size(); originalIndex++) {
- normalizedPartition.put(
- normalizedPositions[originalIndex], originalPartition.get(originalIndex));
- }
- return normalizedPartition;
- }
-
private class PartitionsScan extends StaticTableScan {
PartitionsScan(Table table) {
super(
@@ -208,12 +155,18 @@ public class PartitionsTable extends BaseMetadataTable {
}
static class PartitionMap {
- private final Map<PartitionData, Partition> partitions = Maps.newHashMap();
+ private final StructLikeMap<Partition> partitions;
+ private final Types.StructType keyType;
+
+ PartitionMap(Types.StructType type) {
+ this.partitions = StructLikeMap.create(type);
+ this.keyType = type;
+ }
- Partition get(PartitionData key) {
+ Partition get(StructLike key) {
Partition partition = partitions.get(key);
if (partition == null) {
- partition = new Partition(key);
+ partition = new Partition(key, keyType);
partitions.put(key, partition);
}
return partition;
@@ -225,13 +178,13 @@ public class PartitionsTable extends BaseMetadataTable {
}
static class Partition {
- private final StructLike key;
+ private final PartitionData partitionData;
private int specId;
private long dataRecordCount;
private int dataFileCount;
- Partition(StructLike key) {
- this.key = key;
+ Partition(StructLike key, Types.StructType keyType) {
+ this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataFileCount = 0;
@@ -242,5 +195,17 @@ public class PartitionsTable extends BaseMetadataTable {
this.dataFileCount += 1;
this.specId = file.specId();
}
+
+ /** Needed because StructProjection is not serializable */
+ private PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
+ PartitionData data = new PartitionData(keyType);
+ for (int i = 0; i < keyType.fields().size(); i++) {
+ Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
+ if (val != null) {
+ data.set(i, val);
+ }
+ }
+ return data;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index af2f79c3c6..b3a5e85d33 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -91,7 +91,7 @@ public class PartitionUtil {
}
// adapts the provided partition data to match the table partition type
- private static StructLike coercePartition(
+ public static StructLike coercePartition(
Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
StructProjection projection =
StructProjection.createAllowMissing(spec.partitionType(), partitionType);