You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/04 14:37:13 UTC

[iceberg] branch master updated: Core: Simplify partition coercion code in PartitionsTable (#7503)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c751cb24b6 Core: Simplify partition coercion code in PartitionsTable (#7503)
c751cb24b6 is described below

commit c751cb24b6f59084f473395745a4778f64aaca75
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Thu May 4 07:37:06 2023 -0700

    Core: Simplify partition coercion code in PartitionsTable (#7503)
---
 .../java/org/apache/iceberg/PartitionsTable.java   | 107 +++++++--------------
 .../org/apache/iceberg/util/PartitionUtil.java     |   2 +-
 2 files changed, 37 insertions(+), 72 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 97c82c432f..6c22a6dbf4 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -20,13 +20,13 @@ package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import java.util.Map;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.StructLikeMap;
 
 /** A {@link Table} implementation that exposes a table's partitions as rows. */
 public class PartitionsTable extends BaseMetadataTable {
@@ -90,28 +90,22 @@ public class PartitionsTable extends BaseMetadataTable {
 
   private static StaticDataTask.Row convertPartition(Partition partition) {
     return StaticDataTask.Row.of(
-        partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
+        partition.partitionData,
+        partition.specId,
+        partition.dataRecordCount,
+        partition.dataFileCount);
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
-    PartitionMap partitions = new PartitionMap();
-
-    // cache a position map needed by each partition spec to normalize partitions to final schema
-    Map<Integer, int[]> normalizedPositionsBySpec =
-        Maps.newHashMapWithExpectedSize(table.specs().size());
+    Types.StructType partitionType = Partitioning.partitionType(table);
+    PartitionMap partitions = new PartitionMap(partitionType);
 
     CloseableIterable<DataFile> datafiles = planDataFiles(scan);
     for (DataFile dataFile : datafiles) {
-      PartitionData original = (PartitionData) dataFile.partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              dataFile.specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
-
-      PartitionData normalized =
-          normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(dataFile);
+      StructLike partition =
+          PartitionUtil.coercePartition(
+              partitionType, table.specs().get(dataFile.specId()), dataFile.partition());
+      partitions.get(partition).update(dataFile);
     }
 
     return partitions.all();
@@ -150,53 +144,6 @@ public class PartitionsTable extends BaseMetadataTable {
     return new ParallelIterable<>(tasks, scan.planExecutor());
   }
 
-  /**
-   * Builds an integer array for a specific partition type to map its partitions to the final
-   * normalized type.
-   *
-   * <p>The array represents fields in the original partition type, with the index being the field's
-   * index in the original partition type, and the value being the field's index in the normalized
-   * partition type.
-   *
-   * @param table iceberg table
-   * @param specId spec id where original partition type is written
-   * @param normalizedType normalized partition type
-   */
-  private static int[] normalizedPositions(
-      Table table, int specId, Types.StructType normalizedType) {
-    Types.StructType originalType = table.specs().get(specId).partitionType();
-    int[] normalizedPositions = new int[originalType.fields().size()];
-    for (int originalIndex = 0; originalIndex < originalType.fields().size(); originalIndex++) {
-      Types.NestedField normalizedField =
-          normalizedType.field(originalType.fields().get(originalIndex).fieldId());
-      normalizedPositions[originalIndex] = normalizedType.fields().indexOf(normalizedField);
-    }
-    return normalizedPositions;
-  }
-
-  /**
-   * Convert a partition data written by an old spec, to table's normalized partition type, which is
-   * a common partition type for all specs of the table.
-   *
-   * @param originalPartition un-normalized partition data
-   * @param normalizedPartitionType table's normalized partition type {@link
-   *     Partitioning#partitionType(Table)}
-   * @param normalizedPositions field positions in the normalized partition type indexed by field
-   *     position in the original partition type
-   * @return the normalized partition data
-   */
-  private static PartitionData normalizePartition(
-      PartitionData originalPartition,
-      Types.StructType normalizedPartitionType,
-      int[] normalizedPositions) {
-    PartitionData normalizedPartition = new PartitionData(normalizedPartitionType);
-    for (int originalIndex = 0; originalIndex < originalPartition.size(); originalIndex++) {
-      normalizedPartition.put(
-          normalizedPositions[originalIndex], originalPartition.get(originalIndex));
-    }
-    return normalizedPartition;
-  }
-
   private class PartitionsScan extends StaticTableScan {
     PartitionsScan(Table table) {
       super(
@@ -208,12 +155,18 @@ public class PartitionsTable extends BaseMetadataTable {
   }
 
   static class PartitionMap {
-    private final Map<PartitionData, Partition> partitions = Maps.newHashMap();
+    private final StructLikeMap<Partition> partitions;
+    private final Types.StructType keyType;
+
+    PartitionMap(Types.StructType type) {
+      this.partitions = StructLikeMap.create(type);
+      this.keyType = type;
+    }
 
-    Partition get(PartitionData key) {
+    Partition get(StructLike key) {
       Partition partition = partitions.get(key);
       if (partition == null) {
-        partition = new Partition(key);
+        partition = new Partition(key, keyType);
         partitions.put(key, partition);
       }
       return partition;
@@ -225,13 +178,13 @@ public class PartitionsTable extends BaseMetadataTable {
   }
 
   static class Partition {
-    private final StructLike key;
+    private final PartitionData partitionData;
     private int specId;
     private long dataRecordCount;
     private int dataFileCount;
 
-    Partition(StructLike key) {
-      this.key = key;
+    Partition(StructLike key, Types.StructType keyType) {
+      this.partitionData = toPartitionData(key, keyType);
       this.specId = 0;
       this.dataRecordCount = 0;
       this.dataFileCount = 0;
@@ -242,5 +195,17 @@ public class PartitionsTable extends BaseMetadataTable {
       this.dataFileCount += 1;
       this.specId = file.specId();
     }
+
+    /** Needed because StructProjection is not serializable */
+    private PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
+      PartitionData data = new PartitionData(keyType);
+      for (int i = 0; i < keyType.fields().size(); i++) {
+        Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
+        if (val != null) {
+          data.set(i, val);
+        }
+      }
+      return data;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index af2f79c3c6..b3a5e85d33 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -91,7 +91,7 @@ public class PartitionUtil {
   }
 
   // adapts the provided partition data to match the table partition type
-  private static StructLike coercePartition(
+  public static StructLike coercePartition(
       Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
     StructProjection projection =
         StructProjection.createAllowMissing(spec.partitionType(), partitionType);