You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/11 00:46:03 UTC

[iceberg] branch master updated: Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ef136a0e Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)
92ef136a0e is described below

commit 92ef136a0e2dbe47c2840d539e3670d9570386ed
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Mon Apr 10 17:45:54 2023 -0700

    Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)
    
    This change backports PR #7263 to Spark 3.1 and 3.2
---
 .../java/org/apache/iceberg/spark/SparkUtil.java   |  8 +++
 .../actions/BaseRewriteManifestsSparkAction.java   | 59 ++++++++++++++++------
 .../java/org/apache/iceberg/spark/SparkUtil.java   |  8 +++
 .../spark/actions/RewriteManifestsSparkAction.java | 32 ++++++------
 .../java/org/apache/iceberg/spark/SparkUtil.java   |  8 +++
 5 files changed, 83 insertions(+), 32 deletions(-)

diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 2cdec2b062..5b3f50e692 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -63,6 +63,14 @@ public class SparkUtil {
 
   private SparkUtil() {}
 
+  /**
+   * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+   * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+   * alternative, use {@link org.apache.iceberg.SerializableTable}.
+   *
+   * @deprecated will be removed in 1.4.0
+   */
+  @Deprecated
   public static FileIO serializableFileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
       // we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 8822780acc..078b2e00bc 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -43,7 +44,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -51,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -89,7 +88,6 @@ public class BaseRewriteManifestsSparkAction
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
-  private final FileIO fileIO;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
@@ -106,7 +104,6 @@ public class BaseRewriteManifestsSparkAction
             table.properties(),
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.fileIO = SparkUtil.serializableFileIO(table);
 
     // default the staging location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
@@ -215,7 +212,7 @@ public class BaseRewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
 
     // we rely only on the target number of manifests for unpartitioned tables
@@ -225,7 +222,13 @@ public class BaseRewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+            toManifests(
+                tableBroadcast,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                spec,
+                sparkType),
             manifestEncoder)
         .collectAsList();
   }
@@ -233,7 +236,7 @@ public class BaseRewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
 
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise
@@ -248,7 +251,12 @@ public class BaseRewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+                      tableBroadcast,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      spec,
+                      sparkType),
                   manifestEncoder)
               .collectAsList();
         });
@@ -282,7 +290,7 @@ public class BaseRewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(fileIO).stream()
+    return currentSnapshot.dataManifests(table.io()).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
@@ -334,14 +342,14 @@ public class BaseRewriteManifestsSparkAction
         .noRetry()
         .suppressFailureWhenFinished()
         .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-        .run(fileIO::deleteFile);
+        .run(location -> table.io().deleteFile(location));
   }
 
   private static ManifestFile writeManifest(
       List<Row> rows,
       int startIndex,
       int endIndex,
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       String location,
       int format,
       PartitionSpec spec,
@@ -351,7 +359,10 @@ public class BaseRewriteManifestsSparkAction
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile =
-        io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+        tableBroadcast
+            .value()
+            .io()
+            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType dataFileType = DataFile.getType(spec.partitionType());
     SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
@@ -375,7 +386,7 @@ public class BaseRewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       long maxNumManifestEntries,
       String location,
       int format,
@@ -392,14 +403,30 @@ public class BaseRewriteManifestsSparkAction
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
         manifests.add(
-            writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                tableBroadcast,
+                location,
+                format,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
         manifests.add(
-            writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+            writeManifest(
+                rowsAsList, 0, midIndex, tableBroadcast, location, format, spec, sparkType));
         manifests.add(
             writeManifest(
-                rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                tableBroadcast,
+                location,
+                format,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 84dbd7986a..b9fa2c0a52 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -75,6 +75,14 @@ public class SparkUtil {
 
   private SparkUtil() {}
 
+  /**
+   * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+   * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+   * alternative, use {@link org.apache.iceberg.SerializableTable}.
+   *
+   * @deprecated will be removed in 1.4.0
+   */
+  @Deprecated
   public static FileIO serializableFileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
       // we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 42977b0be5..860168ae0a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -44,7 +45,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -52,7 +52,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
-  private final FileIO fileIO;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
             table.properties(),
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.fileIO = SparkUtil.serializableFileIO(table);
 
     // default the staging location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -228,7 +225,7 @@ public class RewriteManifestsSparkAction
         .repartition(numManifests)
         .mapPartitions(
             toManifests(
-                io,
+                tableBroadcast,
                 maxNumManifestEntries,
                 stagingLocation,
                 formatVersion,
@@ -242,7 +239,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
 
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -258,7 +255,7 @@ public class RewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io,
+                      tableBroadcast,
                       maxNumManifestEntries,
                       stagingLocation,
                       formatVersion,
@@ -298,7 +295,7 @@ public class RewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(fileIO).stream()
+    return currentSnapshot.dataManifests(table.io()).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
@@ -351,14 +348,14 @@ public class RewriteManifestsSparkAction
         .noRetry()
         .suppressFailureWhenFinished()
         .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-        .run(fileIO::deleteFile);
+        .run(location -> table.io().deleteFile(location));
   }
 
   private static ManifestFile writeManifest(
       List<Row> rows,
       int startIndex,
       int endIndex,
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       String location,
       int format,
       Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ public class RewriteManifestsSparkAction
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile =
-        io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+        tableBroadcast
+            .value()
+            .io()
+            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
     Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       long maxNumManifestEntries,
       String location,
       int format,
@@ -416,7 +416,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -429,7 +429,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 midIndex,
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -440,7 +440,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 midIndex,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 4ecf458efd..23d0d9303e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -77,6 +77,14 @@ public class SparkUtil {
 
   private SparkUtil() {}
 
+  /**
+   * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+   * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+   * alternative, use {@link org.apache.iceberg.SerializableTable}.
+   *
+   * @deprecated will be removed in 1.4.0
+   */
+  @Deprecated
   public static FileIO serializableFileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
       // we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization