You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/04 17:37:55 UTC
[iceberg] 05/06: Spark: broadcast table instead of file IO in rewrite manifests (#7263)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 29e60c43e1935c6cddd628b805094fcbc4b5fe6a
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Mon Apr 3 20:38:42 2023 -0700
Spark: broadcast table instead of file IO in rewrite manifests (#7263)
---
.../spark/actions/RewriteManifestsSparkAction.java | 32 +++++++++++-----------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 42977b0be5..860168ae0a 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -44,7 +45,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -52,7 +52,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
private final int formatVersion;
- private final FileIO fileIO;
private final long targetManifestSizeBytes;
private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
table.properties(),
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
- this.fileIO = SparkUtil.serializableFileIO(table);
// default the staging location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -228,7 +225,7 @@ public class RewriteManifestsSparkAction
.repartition(numManifests)
.mapPartitions(
toManifests(
- io,
+ tableBroadcast,
maxNumManifestEntries,
stagingLocation,
formatVersion,
@@ -242,7 +239,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -258,7 +255,7 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io,
+ tableBroadcast,
maxNumManifestEntries,
stagingLocation,
formatVersion,
@@ -298,7 +295,7 @@ public class RewriteManifestsSparkAction
return ImmutableList.of();
}
- return currentSnapshot.dataManifests(fileIO).stream()
+ return currentSnapshot.dataManifests(table.io()).stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
.collect(Collectors.toList());
}
@@ -351,14 +348,14 @@ public class RewriteManifestsSparkAction
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
- .run(fileIO::deleteFile);
+ .run(location -> table.io().deleteFile(location));
}
private static ManifestFile writeManifest(
List<Row> rows,
int startIndex,
int endIndex,
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
String location,
int format,
Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ public class RewriteManifestsSparkAction
String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile =
- io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+ tableBroadcast
+ .value()
+ .io()
+ .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ public class RewriteManifestsSparkAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
long maxNumManifestEntries,
String location,
int format,
@@ -416,7 +416,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
0,
rowsAsList.size(),
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,
@@ -429,7 +429,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
0,
midIndex,
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,
@@ -440,7 +440,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
midIndex,
rowsAsList.size(),
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,