You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/11 00:46:03 UTC
[iceberg] branch master updated: Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 92ef136a0e Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)
92ef136a0e is described below
commit 92ef136a0e2dbe47c2840d539e3670d9570386ed
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Mon Apr 10 17:45:54 2023 -0700
Spark 3.1, 3.2: Broadcast Table instead of FileIO in rewrite manifests (#7263) (#7296)
This change backports PR #7263 to Spark 3.1 and 3.2
---
.../java/org/apache/iceberg/spark/SparkUtil.java | 8 +++
.../actions/BaseRewriteManifestsSparkAction.java | 59 ++++++++++++++++------
.../java/org/apache/iceberg/spark/SparkUtil.java | 8 +++
.../spark/actions/RewriteManifestsSparkAction.java | 32 ++++++------
.../java/org/apache/iceberg/spark/SparkUtil.java | 8 +++
5 files changed, 83 insertions(+), 32 deletions(-)
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 2cdec2b062..5b3f50e692 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -63,6 +63,14 @@ public class SparkUtil {
private SparkUtil() {}
+ /**
+ * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+ * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+ * alternative, use {@link org.apache.iceberg.SerializableTable}.
+ *
+ * @deprecated will be removed in 1.4.0
+ */
+ @Deprecated
public static FileIO serializableFileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
// we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 8822780acc..078b2e00bc 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -43,7 +44,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -51,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -89,7 +88,6 @@ public class BaseRewriteManifestsSparkAction
private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
private final int formatVersion;
- private final FileIO fileIO;
private final long targetManifestSizeBytes;
private PartitionSpec spec = null;
@@ -106,7 +104,6 @@ public class BaseRewriteManifestsSparkAction
table.properties(),
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
- this.fileIO = SparkUtil.serializableFileIO(table);
// default the staging location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
@@ -215,7 +212,7 @@ public class BaseRewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
// we rely only on the target number of manifests for unpartitioned tables
@@ -225,7 +222,13 @@ public class BaseRewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ toManifests(
+ tableBroadcast,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
}
@@ -233,7 +236,7 @@ public class BaseRewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
// we allow the actual size of manifests to be 10% higher if the estimation is not precise
@@ -248,7 +251,12 @@ public class BaseRewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ tableBroadcast,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
});
@@ -282,7 +290,7 @@ public class BaseRewriteManifestsSparkAction
return ImmutableList.of();
}
- return currentSnapshot.dataManifests(fileIO).stream()
+ return currentSnapshot.dataManifests(table.io()).stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
.collect(Collectors.toList());
}
@@ -334,14 +342,14 @@ public class BaseRewriteManifestsSparkAction
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
- .run(fileIO::deleteFile);
+ .run(location -> table.io().deleteFile(location));
}
private static ManifestFile writeManifest(
List<Row> rows,
int startIndex,
int endIndex,
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
String location,
int format,
PartitionSpec spec,
@@ -351,7 +359,10 @@ public class BaseRewriteManifestsSparkAction
String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile =
- io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+ tableBroadcast
+ .value()
+ .io()
+ .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
Types.StructType dataFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
@@ -375,7 +386,7 @@ public class BaseRewriteManifestsSparkAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
long maxNumManifestEntries,
String location,
int format,
@@ -392,14 +403,30 @@ public class BaseRewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
- writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ tableBroadcast,
+ location,
+ format,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
- writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+ writeManifest(
+ rowsAsList, 0, midIndex, tableBroadcast, location, format, spec, sparkType));
manifests.add(
writeManifest(
- rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ tableBroadcast,
+ location,
+ format,
+ spec,
+ sparkType));
}
return manifests.iterator();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 84dbd7986a..b9fa2c0a52 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -75,6 +75,14 @@ public class SparkUtil {
private SparkUtil() {}
+ /**
+ * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+ * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+ * alternative, use {@link org.apache.iceberg.SerializableTable}.
+ *
+ * @deprecated will be removed in 1.4.0
+ */
+ @Deprecated
public static FileIO serializableFileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
// we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 42977b0be5..860168ae0a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -44,7 +45,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -52,7 +52,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
private final int formatVersion;
- private final FileIO fileIO;
private final long targetManifestSizeBytes;
private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
table.properties(),
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
- this.fileIO = SparkUtil.serializableFileIO(table);
// default the staging location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -228,7 +225,7 @@ public class RewriteManifestsSparkAction
.repartition(numManifests)
.mapPartitions(
toManifests(
- io,
+ tableBroadcast,
maxNumManifestEntries,
stagingLocation,
formatVersion,
@@ -242,7 +239,7 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+ Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -258,7 +255,7 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io,
+ tableBroadcast,
maxNumManifestEntries,
stagingLocation,
formatVersion,
@@ -298,7 +295,7 @@ public class RewriteManifestsSparkAction
return ImmutableList.of();
}
- return currentSnapshot.dataManifests(fileIO).stream()
+ return currentSnapshot.dataManifests(table.io()).stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
.collect(Collectors.toList());
}
@@ -351,14 +348,14 @@ public class RewriteManifestsSparkAction
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
- .run(fileIO::deleteFile);
+ .run(location -> table.io().deleteFile(location));
}
private static ManifestFile writeManifest(
List<Row> rows,
int startIndex,
int endIndex,
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
String location,
int format,
Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ public class RewriteManifestsSparkAction
String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile =
- io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+ tableBroadcast
+ .value()
+ .io()
+ .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ public class RewriteManifestsSparkAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io,
+ Broadcast<Table> tableBroadcast,
long maxNumManifestEntries,
String location,
int format,
@@ -416,7 +416,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
0,
rowsAsList.size(),
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,
@@ -429,7 +429,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
0,
midIndex,
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,
@@ -440,7 +440,7 @@ public class RewriteManifestsSparkAction
rowsAsList,
midIndex,
rowsAsList.size(),
- io,
+ tableBroadcast,
location,
format,
combinedPartitionType,
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 4ecf458efd..23d0d9303e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -77,6 +77,14 @@ public class SparkUtil {
private SparkUtil() {}
+ /**
+ * Using this to broadcast FileIO can lead to unexpected behavior, as broadcast variables that
+ * implement {@link AutoCloseable} will be closed by Spark during broadcast removal. As an
+ * alternative, use {@link org.apache.iceberg.SerializableTable}.
+ *
+ * @deprecated will be removed in 1.4.0
+ */
+ @Deprecated
public static FileIO serializableFileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
// we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization