You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/09/03 19:09:49 UTC
[iceberg] 08/09: Spark: Fix stats in rewrite metadata action (#5691)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 7feb9457bd095b1d5e3d6224db4780cc5b833a36
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700
Spark: Fix stats in rewrite metadata action (#5691)
* Core: Don't show dropped fields from the partition spec
* Use projection instead
* Use StructProjection in SparkDataFile.
Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
.../extensions/TestRewriteManifestsProcedure.java | 29 ++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 ++++-
.../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
.../extensions/TestRewriteManifestsProcedure.java | 29 ++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 ++++-
.../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
6 files changed, 262 insertions(+), 48 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark.extensions;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
IllegalArgumentException.class, "Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType = type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
+ manifestEncoder)
.collectAsList();
}
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
- return withReusableDS(manifestEntryDF, df -> {
- Column partitionColumn = df.col("data_file.partition");
- return df.repartitionByRange(numManifests, partitionColumn)
- .sortWithinPartitions(partitionColumn)
- .mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
- .collectAsList();
- });
+ return withReusableDS(
+ manifestEntryDF,
+ df -> {
+ Column partitionColumn = df.col("data_file.partition");
+ return df.repartitionByRange(numManifests, partitionColumn)
+ .sortWithinPartitions(partitionColumn)
+ .mapPartitions(
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
+ manifestEncoder)
+ .collectAsList();
+ });
}
private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
}
private static ManifestFile writeManifest(
- List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
- String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+ List<Row> rows,
+ int startIndex,
+ int endIndex,
+ Broadcast<FileIO> io,
+ String location,
+ int format,
+ Types.StructType combinedPartitionType,
+ PartitionSpec spec,
+ StructType sparkType)
+ throws IOException {
String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io, long maxNumManifestEntries, String location,
- int format, PartitionSpec spec, StructType sparkType) {
+ Broadcast<FileIO> io,
+ long maxNumManifestEntries,
+ String location,
+ int format,
+ Types.StructType combinedPartitionType,
+ PartitionSpec spec,
+ StructType sparkType) {
return rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
- manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
- manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
- manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark.extensions;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
IllegalArgumentException.class, "Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType = type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
+ manifestEncoder)
.collectAsList();
}
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
- return withReusableDS(manifestEntryDF, df -> {
- Column partitionColumn = df.col("data_file.partition");
- return df.repartitionByRange(numManifests, partitionColumn)
- .sortWithinPartitions(partitionColumn)
- .mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
- .collectAsList();
- });
+ return withReusableDS(
+ manifestEntryDF,
+ df -> {
+ Column partitionColumn = df.col("data_file.partition");
+ return df.repartitionByRange(numManifests, partitionColumn)
+ .sortWithinPartitions(partitionColumn)
+ .mapPartitions(
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
+ manifestEncoder)
+ .collectAsList();
+ });
}
private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
}
private static ManifestFile writeManifest(
- List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
- String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+ List<Row> rows,
+ int startIndex,
+ int endIndex,
+ Broadcast<FileIO> io,
+ String location,
+ int format,
+ Types.StructType combinedPartitionType,
+ PartitionSpec spec,
+ StructType sparkType)
+ throws IOException {
String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io, long maxNumManifestEntries, String location,
- int format, PartitionSpec spec, StructType sparkType) {
+ Broadcast<FileIO> io,
+ long maxNumManifestEntries,
+ String location,
+ int format,
+ Types.StructType combinedPartitionType,
+ PartitionSpec spec,
+ StructType sparkType) {
return rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
- manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
- manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
- manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
+ manifests.add(
+ writeManifest(
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();