You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2022/09/02 06:21:04 UTC

[iceberg] branch master updated: Spark: Fix stats in rewrite metadata action (#5691)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a3f87927b Spark: Fix stats in rewrite metadata action (#5691)
2a3f87927b is described below

commit 2a3f87927be9abfddcd5835621fe7c516a4877c4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700

    Spark: Fix stats in rewrite metadata action (#5691)
    
    * Core: Don't show dropped fields from the partition spec
    
    * Use projection instead
    
    * Use StructProjection in SparkDataFile.
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
 .../extensions/TestRewriteManifestsProcedure.java  | 29 +++++++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    | 21 +++++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
 .../extensions/TestRewriteManifestsProcedure.java  | 29 +++++++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    | 21 +++++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
 6 files changed, 200 insertions(+), 18 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
       Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
             manifestEncoder)
         .collectAsList();
   }
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise
     // enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
                   manifestEncoder)
               .collectAsList();
         });
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
       Broadcast<FileIO> io,
       String location,
       int format,
+      Types.StructType combinedPartitionType,
       PartitionSpec spec,
       StructType sparkType)
       throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
     OutputFile outputFile =
         io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
       long maxNumManifestEntries,
       String location,
       int format,
+      Types.StructType combinedPartitionType,
       PartitionSpec spec,
       StructType sparkType) {
 
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
         manifests.add(
-            writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
         manifests.add(
-            writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
         manifests.add(
             writeManifest(
-                rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
       Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
             manifestEncoder)
         .collectAsList();
   }
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise
     // enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
                   manifestEncoder)
               .collectAsList();
         });
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
       Broadcast<FileIO> io,
       String location,
       int format,
+      Types.StructType combinedPartitionType,
       PartitionSpec spec,
       StructType sparkType)
       throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
     OutputFile outputFile =
         io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
       long maxNumManifestEntries,
       String location,
       int format,
+      Types.StructType combinedPartitionType,
       PartitionSpec spec,
       StructType sparkType) {
 
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
         manifests.add(
-            writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
         manifests.add(
-            writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
         manifests.add(
             writeManifest(
-                rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();