You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2022/09/02 06:21:04 UTC
[iceberg] branch master updated: Spark: Fix stats in rewrite metadata action (#5691)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2a3f87927b Spark: Fix stats in rewrite metadata action (#5691)
2a3f87927b is described below
commit 2a3f87927be9abfddcd5835621fe7c516a4877c4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700
Spark: Fix stats in rewrite metadata action (#5691)
* Core: Don't show dropped fields from the partition spec
* Use projection instead
* Use StructProjection in SparkDataFile.
Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
.../extensions/TestRewriteManifestsProcedure.java | 29 +++++++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 +++++++-
.../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
.../extensions/TestRewriteManifestsProcedure.java | 29 +++++++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 +++++++-
.../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
6 files changed, 200 insertions(+), 18 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType = type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
}
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the estimation is not precise
// enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
});
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
OutputFile outputFile =
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
long maxNumManifestEntries,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
- writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
- writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
manifests.add(
writeManifest(
- rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType = type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
}
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the estimation is not precise
// enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
});
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
OutputFile outputFile =
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
long maxNumManifestEntries,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
- writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
- writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
manifests.add(
writeManifest(
- rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();