You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/04 17:37:55 UTC

[iceberg] 05/06: Spark: broadcast table instead of file IO in rewrite manifests (#7263)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 29e60c43e1935c6cddd628b805094fcbc4b5fe6a
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Mon Apr 3 20:38:42 2023 -0700

    Spark: broadcast table instead of file IO in rewrite manifests (#7263)
---
 .../spark/actions/RewriteManifestsSparkAction.java | 32 +++++++++++-----------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 42977b0be5..860168ae0a 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -44,7 +45,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -52,7 +52,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
-  private final FileIO fileIO;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
             table.properties(),
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.fileIO = SparkUtil.serializableFileIO(table);
 
     // default the staging location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -228,7 +225,7 @@ public class RewriteManifestsSparkAction
         .repartition(numManifests)
         .mapPartitions(
             toManifests(
-                io,
+                tableBroadcast,
                 maxNumManifestEntries,
                 stagingLocation,
                 formatVersion,
@@ -242,7 +239,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
 
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -258,7 +255,7 @@ public class RewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io,
+                      tableBroadcast,
                       maxNumManifestEntries,
                       stagingLocation,
                       formatVersion,
@@ -298,7 +295,7 @@ public class RewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(fileIO).stream()
+    return currentSnapshot.dataManifests(table.io()).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
@@ -351,14 +348,14 @@ public class RewriteManifestsSparkAction
         .noRetry()
         .suppressFailureWhenFinished()
         .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-        .run(fileIO::deleteFile);
+        .run(location -> table.io().deleteFile(location));
   }
 
   private static ManifestFile writeManifest(
       List<Row> rows,
       int startIndex,
       int endIndex,
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       String location,
       int format,
       Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ public class RewriteManifestsSparkAction
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile =
-        io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+        tableBroadcast
+            .value()
+            .io()
+            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
     Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       long maxNumManifestEntries,
       String location,
       int format,
@@ -416,7 +416,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -429,7 +429,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 midIndex,
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -440,7 +440,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 midIndex,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,