You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/20 06:33:05 UTC
[iceberg] branch master updated: Spark: Pass Table to executors
(#2362)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a79de57 Spark: Pass Table to executors (#2362)
a79de57 is described below
commit a79de571860a290f6e96ac562d616c9c6be2071e
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 19 23:32:55 2021 -0700
Spark: Pass Table to executors (#2362)
---
.../org/apache/iceberg/io/OutputFileFactory.java | 12 ++++
.../iceberg/actions/RewriteDataFilesAction.java | 8 +--
.../iceberg/spark/source/BatchDataReader.java | 12 ++--
.../spark/source/EqualityDeleteRowReader.java | 8 +--
.../apache/iceberg/spark/source/RowDataReader.java | 16 ++---
.../iceberg/spark/source/RowDataRewriter.java | 40 ++++-------
.../iceberg/spark/source/SparkAppenderFactory.java | 6 ++
.../spark/source/TestSparkReaderDeletes.java | 4 +-
.../apache/iceberg/spark/source/IcebergSource.java | 27 +------
.../org/apache/iceberg/spark/source/Reader.java | 84 ++++++++--------------
.../iceberg/spark/source/StreamingWriter.java | 11 ++-
.../org/apache/iceberg/spark/source/Writer.java | 63 +++++++---------
.../iceberg/spark/source/SparkBatchQueryScan.java | 11 ++-
.../iceberg/spark/source/SparkBatchScan.java | 65 ++++++-----------
.../iceberg/spark/source/SparkMergeScan.java | 11 ++-
.../iceberg/spark/source/SparkScanBuilder.java | 25 +------
.../apache/iceberg/spark/source/SparkWrite.java | 56 +++++++--------
.../iceberg/spark/source/SparkWriteBuilder.java | 24 +------
18 files changed, 171 insertions(+), 312 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
index 72d4a30..c510890 100644
--- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
+++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
@@ -44,6 +45,17 @@ public class OutputFileFactory {
private final String operationId;
private final AtomicInteger fileCount = new AtomicInteger(0);
+ // TODO: expose a builder like OutputFileFactory.forTable()
+ public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {
+ this(table.spec(), format, table.locationProvider(), table.io(), table.encryption(),
+ partitionId, taskId, UUID.randomUUID().toString());
+ }
+
+ public OutputFileFactory(Table table, PartitionSpec spec, FileFormat format, int partitionId, long taskId) {
+ this(spec, format, table.locationProvider(), table.io(), table.encryption(),
+ partitionId, taskId, UUID.randomUUID().toString());
+ }
+
/**
* Constructor where a generated UUID is used as the operationId to ensure uniqueness.
* @param spec Partition specification used by the location provider
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index 7e6a817..735e719 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -22,8 +22,8 @@ package org.apache.iceberg.actions;
import java.util.List;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.source.RowDataRewriter;
@@ -59,10 +59,8 @@ public class RewriteDataFilesAction
@Override
protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
- Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
- Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
- RowDataRewriter rowDataRewriter =
- new RowDataRewriter(table(), spec(), caseSensitive(), io, encryption);
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+ RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());
return rowDataRewriter.rewriteDataForTasks(taskRDD);
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index d48cf24..8cfe46b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -28,10 +28,10 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
@@ -51,12 +51,10 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final boolean caseSensitive;
private final int batchSize;
- BatchDataReader(
- CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
- EncryptionManager encryptionManager, boolean caseSensitive, int size) {
- super(task, fileIo, encryptionManager);
+ BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
+ super(task, table.io(), table.encryption());
this.expectedSchema = expectedSchema;
- this.nameMapping = nameMapping;
+ this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
this.batchSize = size;
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index a85d85d..d4328ad 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -24,9 +24,8 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,9 +33,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
public class EqualityDeleteRowReader extends RowDataReader {
private final Schema expectedSchema;
- public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
- FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
- super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive);
+ public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+ super(task, table, table.schema(), caseSensitive);
this.expectedSchema = expectedSchema;
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 7185a24..6d4bf8e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,13 +28,13 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
@@ -63,20 +63,16 @@ class RowDataReader extends BaseDataReader<InternalRow> {
.impl(UnsafeProjection.class, InternalRow.class)
.build();
- private final FileIO io;
private final Schema tableSchema;
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
- RowDataReader(
- CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive) {
- super(task, io, encryptionManager);
- this.io = io;
- this.tableSchema = tableSchema;
+ RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+ super(task, table.io(), table.encryption());
+ this.tableSchema = table.schema();
this.expectedSchema = expectedSchema;
- this.nameMapping = nameMapping;
+ this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 07c7e00..0342c22 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -32,9 +32,6 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
@@ -49,34 +46,21 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
public class RowDataRewriter implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
- private final Schema schema;
+ private final Broadcast<Table> tableBroadcast;
private final PartitionSpec spec;
- private final Map<String, String> properties;
private final FileFormat format;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
- private final LocationProvider locations;
- private final String nameMapping;
private final boolean caseSensitive;
- public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
- Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
- this.schema = table.schema();
+ public RowDataRewriter(Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean caseSensitive) {
+ this.tableBroadcast = tableBroadcast;
this.spec = spec;
- this.locations = table.locationProvider();
- this.properties = table.properties();
- this.io = io;
- this.encryptionManager = encryptionManager;
-
this.caseSensitive = caseSensitive;
- this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+ Table table = tableBroadcast.value();
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
@@ -95,27 +79,29 @@ public class RowDataRewriter implements Serializable {
int partitionId = context.partitionId();
long taskId = context.taskAttemptId();
- RowDataReader dataReader = new RowDataReader(
- task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+ Table table = tableBroadcast.value();
+ Schema schema = table.schema();
+ Map<String, String> properties = table.properties();
+
+ RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);
StructType structType = SparkSchemaUtil.convert(schema);
SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
- OutputFileFactory fileFactory = new OutputFileFactory(
- spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+ OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format, partitionId, taskId);
TaskWriter<InternalRow> writer;
if (spec.isUnpartitioned()) {
- writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(),
+ writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, table.io(),
Long.MAX_VALUE);
} else if (PropertyUtil.propertyAsBoolean(properties,
TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
writer = new SparkPartitionedFanoutWriter(
- spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
+ spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema,
structType);
} else {
writer = new SparkPartitionedWriter(
- spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
+ spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema,
structType);
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index 3fe7513..b97a007 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
@@ -60,6 +61,11 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
private StructType eqDeleteSparkType = null;
private StructType posDeleteSparkType = null;
+ // TODO: expose a builder like SparkAppenderFactory.forTable()
+ SparkAppenderFactory(Table table, Schema writeSchema, StructType dsSchema) {
+ this(table.properties(), writeSchema, dsSchema, table.spec());
+ }
+
SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null);
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 75b37c7..f6821d0 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -61,7 +61,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
public abstract class TestSparkReaderDeletes extends DeleteReadTests {
@@ -206,8 +205,7 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests {
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
for (CombinedScanTask task : tasks) {
- try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(),
- table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) {
+ try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) {
while (reader.next()) {
actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));
}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 9bf9d15c..9b86e00 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -24,16 +24,12 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.StreamExecution;
@@ -52,7 +48,6 @@ import org.apache.spark.sql.types.StructType;
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
private SparkSession lazySpark = null;
- private JavaSparkContext lazySparkContext = null;
private Configuration lazyConf = null;
@Override
@@ -71,10 +66,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
Table table = getTableAndResolveHadoopConfiguration(options, conf);
String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive");
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
- Reader reader = new Reader(table, io, encryptionManager, Boolean.parseBoolean(caseSensitive), options);
+ Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
if (readSchema != null) {
// convert() will fail if readSchema contains fields not in table.schema()
SparkSchemaUtil.convert(table.schema(), readSchema);
@@ -98,11 +90,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
String wapId = lazySparkSession().conf().get("spark.wap.id", null);
boolean replacePartitions = mode == SaveMode.Overwrite;
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
return Optional.of(new Writer(
- table, io, encryptionManager, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
+ lazySparkSession(), table, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
}
@Override
@@ -121,10 +110,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
String appId = lazySparkSession().sparkContext().applicationId();
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
- return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct);
+ return new StreamingWriter(lazySparkSession(), table, options, queryId, mode, appId, writeSchema, dsStruct);
}
protected Table findTable(DataSourceOptions options, Configuration conf) {
@@ -146,13 +132,6 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
return lazySpark;
}
- private JavaSparkContext lazySparkContext() {
- if (lazySparkContext == null) {
- this.lazySparkContext = new JavaSparkContext(lazySparkSession().sparkContext());
- }
- return lazySparkContext;
- }
-
private Configuration lazyBaseConf() {
if (lazyConf == null) {
this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index aa93f4c..45a13f2 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -32,11 +32,11 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
@@ -44,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -53,6 +52,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
@@ -72,8 +72,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
@@ -81,6 +79,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private static final Filter[] NO_FILTERS = new Filter[0];
private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+ private final JavaSparkContext sparkContext;
private final Table table;
private final DataSourceOptions options;
private final Long snapshotId;
@@ -90,8 +89,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final boolean caseSensitive;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
@@ -105,8 +102,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
private Boolean readUsingBatch = null;
- Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- boolean caseSensitive, DataSourceOptions options) {
+ Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
+ this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.options = options;
this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
@@ -135,7 +132,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
- if (io.getValue() instanceof HadoopFileIO) {
+ if (table.io() instanceof HadoopFileIO) {
String fsscheme = "no_exist";
try {
Configuration conf = SparkSession.active().sessionState().newHadoopConf();
@@ -156,8 +153,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
}
this.schema = table.schema();
- this.io = io;
- this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() ->
PropertyUtil.propertyAsInt(table.properties(),
@@ -202,17 +197,18 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
Preconditions.checkState(enableBatchRead(), "Batched reads not enabled");
Preconditions.checkState(batchSize > 0, "Invalid batch size");
- String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
- String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
"Cannot scan table %s: cannot apply required delete files", table);
+ // broadcast the table metadata as input partitions will be sent to executors
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+
List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
- task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
+ task, tableBroadcast, expectedSchemaString, caseSensitive,
localityPreferred, new BatchReaderFactory(batchSize)));
}
LOG.info("Batching input partitions with {} tasks.", readTasks.size());
@@ -225,14 +221,15 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
*/
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
- String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
- String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
+
+ // broadcast the table metadata as input partitions will be sent to executors
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
- task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
+ task, tableBroadcast, expectedSchemaString, caseSensitive,
localityPreferred, InternalRowReaderFactory.INSTANCE));
}
@@ -443,38 +440,30 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private static class ReadTask<T> implements Serializable, InputPartition<T> {
private final CombinedScanTask task;
- private final String tableSchemaString;
+ private final Broadcast<Table> tableBroadcast;
private final String expectedSchemaString;
- private final String nameMappingString;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final boolean caseSensitive;
private final boolean localityPreferred;
private final ReaderFactory<T> readerFactory;
- private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
private transient String[] preferredLocations = null;
- private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
- String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+ private ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
this.task = task;
- this.tableSchemaString = tableSchemaString;
+ this.tableBroadcast = tableBroadcast;
this.expectedSchemaString = expectedSchemaString;
- this.io = io;
- this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
this.localityPreferred = localityPreferred;
this.preferredLocations = getPreferredLocations();
this.readerFactory = readerFactory;
- this.nameMappingString = nameMappingString;
}
@Override
public InputPartitionReader<T> createPartitionReader() {
- return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
- encryptionManager.value(), caseSensitive);
+ Table table = tableBroadcast.value();
+ return readerFactory.create(task, table, lazyExpectedSchema(), caseSensitive);
}
@Override
@@ -482,13 +471,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
return preferredLocations;
}
- private Schema lazyTableSchema() {
- if (tableSchema == null) {
- this.tableSchema = SchemaParser.fromJson(tableSchemaString);
- }
- return tableSchema;
- }
-
private Schema lazyExpectedSchema() {
if (expectedSchema == null) {
this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
@@ -508,9 +490,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
}
private interface ReaderFactory<T> extends Serializable {
- InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
- String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive);
+ InputPartitionReader<T> create(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive);
}
private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
@@ -520,10 +500,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
}
@Override
- public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
- String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive) {
- return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+ public InputPartitionReader<InternalRow> create(CombinedScanTask task, Table table,
+ Schema expectedSchema, boolean caseSensitive) {
+ return new RowReader(task, table, expectedSchema, caseSensitive);
}
}
@@ -535,24 +514,21 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
}
@Override
- public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
- String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive) {
- return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
+ public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Table table,
+ Schema expectedSchema, boolean caseSensitive) {
+ return new BatchReader(task, table, expectedSchema, caseSensitive, batchSize);
}
}
private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
- RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive) {
- super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+ RowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+ super(task, table, expectedSchema, caseSensitive);
}
}
private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
- BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
- EncryptionManager encryptionManager, boolean caseSensitive, int size) {
- super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
+ BatchReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
+ super(task, table, expectedSchema, caseSensitive, size);
}
}
}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
index f3a1a40..8ca33b1 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -27,10 +27,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
@@ -48,10 +46,9 @@ public class StreamingWriter extends Writer implements StreamWriter {
private final String queryId;
private final OutputMode mode;
- StreamingWriter(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- DataSourceOptions options, String queryId, OutputMode mode, String applicationId,
- Schema writeSchema, StructType dsSchema) {
- super(table, io, encryptionManager, options, false, applicationId, writeSchema, dsSchema);
+ StreamingWriter(SparkSession spark, Table table, DataSourceOptions options, String queryId,
+ OutputMode mode, String applicationId, Schema writeSchema, StructType dsSchema) {
+ super(spark, table, options, false, applicationId, writeSchema, dsSchema);
this.queryId = queryId;
this.mode = mode;
}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index f54eae7..76455b4 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -30,13 +30,12 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -45,7 +44,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
@@ -75,10 +76,9 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
class Writer implements DataSourceWriter {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
+ private final JavaSparkContext sparkContext;
private final Table table;
private final FileFormat format;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final boolean replacePartitions;
private final String applicationId;
private final String wapId;
@@ -88,19 +88,16 @@ class Writer implements DataSourceWriter {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
- Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
- StructType dsSchema) {
- this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
+ Writer(SparkSession spark, Table table, DataSourceOptions options, boolean replacePartitions,
+ String applicationId, Schema writeSchema, StructType dsSchema) {
+ this(spark, table, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
}
- Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
- Schema writeSchema, StructType dsSchema) {
+ Writer(SparkSession spark, Table table, DataSourceOptions options, boolean replacePartitions,
+ String applicationId, String wapId, Schema writeSchema, StructType dsSchema) {
+ this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.format = getFileFormat(table.properties(), options);
- this.io = io;
- this.encryptionManager = encryptionManager;
this.replacePartitions = replacePartitions;
this.applicationId = applicationId;
this.wapId = wapId;
@@ -137,9 +134,9 @@ class Writer implements DataSourceWriter {
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new WriterFactory(
- table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
- writeSchema, dsSchema, partitionedFanoutEnabled);
+ // broadcast the table metadata as the writer factory will be sent to executors
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+ return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}
@Override
@@ -210,7 +207,7 @@ class Writer implements DataSourceWriter {
2.0 /* exponential */)
.throwFailureWhenFinished()
.run(file -> {
- io.value().deleteFile(file.path().toString());
+ table.io().deleteFile(file.path().toString());
});
}
@@ -245,27 +242,17 @@ class Writer implements DataSourceWriter {
}
static class WriterFactory implements DataWriterFactory<InternalRow> {
- private final PartitionSpec spec;
+ private final Broadcast<Table> tableBroadcast;
private final FileFormat format;
- private final LocationProvider locations;
- private final Map<String, String> properties;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
- WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, Broadcast<FileIO> io,
- Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+ WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
- this.spec = spec;
+ this.tableBroadcast = tableBroadcast;
this.format = format;
- this.locations = locations;
- this.properties = properties;
- this.io = io;
- this.encryptionManager = encryptionManager;
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
@@ -274,17 +261,21 @@ class Writer implements DataSourceWriter {
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
- OutputFileFactory fileFactory = new OutputFileFactory(
- spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
- SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+ Table table = tableBroadcast.value();
+
+ OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+ SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+ PartitionSpec spec = table.spec();
+ FileIO io = table.io();
if (spec.isUnpartitioned()) {
- return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+ return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
} else if (partitionedFanoutEnabled) {
- return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
+ return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize,
writeSchema, dsSchema);
} else {
- return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
+ return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize,
writeSchema, dsSchema);
}
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 76d873e..704a771 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -27,15 +27,13 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
class SparkBatchQueryScan extends SparkBatchScan {
@@ -50,11 +48,10 @@ class SparkBatchQueryScan extends SparkBatchScan {
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
- SparkBatchQueryScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
- boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
- CaseInsensitiveStringMap options) {
+ SparkBatchQueryScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
+ List<Expression> filters, CaseInsensitiveStringMap options) {
- super(table, io, encryption, caseSensitive, expectedSchema, filters, options);
+ super(spark, table, caseSensitive, expectedSchema, filters, options);
this.snapshotId = Spark3Util.propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID, null);
this.asOfTimestamp = Spark3Util.propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP, null);
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index e26f8c6..1999b56 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -30,18 +30,17 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
@@ -62,29 +61,26 @@ import org.slf4j.LoggerFactory;
abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class);
+ private final JavaSparkContext sparkContext;
private final Table table;
private final boolean caseSensitive;
private final boolean localityPreferred;
private final Schema expectedSchema;
private final List<Expression> filterExpressions;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final int batchSize;
private final CaseInsensitiveStringMap options;
// lazy variables
private StructType readSchema = null;
- SparkBatchScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
- boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
- CaseInsensitiveStringMap options) {
+ SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
+ List<Expression> filters, CaseInsensitiveStringMap options) {
+ this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
- this.io = io;
- this.encryptionManager = encryption;
this.caseSensitive = caseSensitive;
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
- this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options);
+ this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
this.batchSize = Spark3Util.batchSize(table.properties(), options);
this.options = options;
}
@@ -122,15 +118,16 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
@Override
public InputPartition[] planInputPartitions() {
- String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
- String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+
+ // broadcast the table metadata as input partitions will be sent to executors
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
List<CombinedScanTask> scanTasks = tasks();
InputPartition[] readTasks = new InputPartition[scanTasks.size()];
for (int i = 0; i < scanTasks.size(); i++) {
readTasks[i] = new ReadTask(
- scanTasks.get(i), tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager,
+ scanTasks.get(i), tableBroadcast, expectedSchemaString,
caseSensitive, localityPreferred);
}
@@ -249,43 +246,34 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private static class RowReader extends RowDataReader implements PartitionReader<InternalRow> {
RowReader(ReadTask task) {
- super(task.task, task.tableSchema(), task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(),
- task.isCaseSensitive());
+ super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive());
}
}
private static class BatchReader extends BatchDataReader implements PartitionReader<ColumnarBatch> {
BatchReader(ReadTask task, int batchSize) {
- super(task.task, task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(),
- task.isCaseSensitive(), batchSize);
+ super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize);
}
}
private static class ReadTask implements InputPartition, Serializable {
private final CombinedScanTask task;
- private final String tableSchemaString;
+ private final Broadcast<Table> tableBroadcast;
private final String expectedSchemaString;
- private final String nameMappingString;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final boolean caseSensitive;
- private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
private transient String[] preferredLocations = null;
- ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, String nameMappingString,
- Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager, boolean caseSensitive,
- boolean localityPreferred) {
+ ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
+ boolean caseSensitive, boolean localityPreferred) {
this.task = task;
- this.tableSchemaString = tableSchemaString;
+ this.tableBroadcast = tableBroadcast;
this.expectedSchemaString = expectedSchemaString;
- this.nameMappingString = nameMappingString;
- this.io = io;
- this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
if (localityPreferred) {
- this.preferredLocations = Util.blockLocations(io.value(), task);
+ Table table = tableBroadcast.value();
+ this.preferredLocations = Util.blockLocations(table.io(), task);
} else {
this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
}
@@ -300,25 +288,14 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
return task.files();
}
- public FileIO io() {
- return io.value();
- }
-
- public EncryptionManager encryption() {
- return encryptionManager.value();
+ public Table table() {
+ return tableBroadcast.value();
}
public boolean isCaseSensitive() {
return caseSensitive;
}
- private Schema tableSchema() {
- if (tableSchema == null) {
- this.tableSchema = SchemaParser.fromJson(tableSchemaString);
- }
- return tableSchema;
- }
-
private Schema expectedSchema() {
if (expectedSchema == null) {
this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
index ac2ee40..1042f19 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -32,18 +32,16 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -70,11 +68,10 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
private Set<String> filteredLocations = null;
- SparkMergeScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
- boolean caseSensitive, boolean ignoreResiduals, Schema expectedSchema,
- List<Expression> filters, CaseInsensitiveStringMap options) {
+ SparkMergeScan(SparkSession spark, Table table, boolean caseSensitive, boolean ignoreResiduals,
+ Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {
- super(table, io, encryption, caseSensitive, expectedSchema, filters, options);
+ super(spark, table, caseSensitive, expectedSchema, filters, options);
this.table = table;
this.ignoreResiduals = ignoreResiduals;
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 5ab24a4..633a171 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -26,20 +26,15 @@ import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -65,9 +60,6 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
private Filter[] pushedFilters = NO_FILTERS;
private boolean ignoreResiduals = false;
- // lazy variables
- private JavaSparkContext lazySparkContext = null;
-
SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
@@ -75,13 +67,6 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
}
- private JavaSparkContext lazySparkContext() {
- if (lazySparkContext == null) {
- this.lazySparkContext = new JavaSparkContext(spark.sparkContext());
- }
- return lazySparkContext;
- }
-
private Schema lazySchema() {
if (schema == null) {
if (requestedProjection != null) {
@@ -174,19 +159,13 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
@Override
public Scan build() {
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryption = lazySparkContext().broadcast(table.encryption());
-
return new SparkBatchQueryScan(
- table, io, encryption, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options);
+ spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options);
}
public Scan buildMergeScan() {
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryption = lazySparkContext().broadcast(table.encryption());
-
return new SparkMergeScan(
- table, io, encryption, caseSensitive, ignoreResiduals,
+ spark, table, caseSensitive, ignoreResiduals,
schemaWithMetadataColumns(), filterExpressions, options);
}
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 2fc803f..3a43777 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -35,16 +35,15 @@ import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -53,7 +52,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
@@ -87,11 +88,10 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
class SparkWrite {
private static final Logger LOG = LoggerFactory.getLogger(SparkWrite.class);
+ private final JavaSparkContext sparkContext;
private final Table table;
private final String queryId;
private final FileFormat format;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final String applicationId;
private final String wapId;
private final long targetFileSize;
@@ -100,14 +100,13 @@ class SparkWrite {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
- SparkWrite(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- LogicalWriteInfo writeInfo, String applicationId, String wapId,
+ SparkWrite(SparkSession spark, Table table, LogicalWriteInfo writeInfo,
+ String applicationId, String wapId,
Schema writeSchema, StructType dsSchema) {
+ this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.queryId = writeInfo.queryId();
this.format = getFileFormat(table.properties(), writeInfo.options());
- this.io = io;
- this.encryptionManager = encryptionManager;
this.applicationId = applicationId;
this.wapId = wapId;
this.writeSchema = writeSchema;
@@ -168,9 +167,9 @@ class SparkWrite {
// the writer factory works for both batch and streaming
private WriterFactory createWriterFactory() {
- return new WriterFactory(
- table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
- writeSchema, dsSchema, partitionedFanoutEnabled);
+ // broadcast the table metadata as the writer factory will be sent to executors
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+ return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}
private void commitOperation(SnapshotUpdate<?> operation, String description) {
@@ -207,7 +206,7 @@ class SparkWrite {
2.0 /* exponential */)
.throwFailureWhenFinished()
.run(file -> {
- io.value().deleteFile(file.path().toString());
+ table.io().deleteFile(file.path().toString());
});
}
@@ -479,27 +478,17 @@ class SparkWrite {
}
private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
- private final PartitionSpec spec;
+ private final Broadcast<Table> tableBroadcast;
private final FileFormat format;
- private final LocationProvider locations;
- private final Map<String, String> properties;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
- protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, Broadcast<FileIO> io,
- Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+ protected WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
- this.spec = spec;
+ this.tableBroadcast = tableBroadcast;
this.format = format;
- this.locations = locations;
- this.properties = properties;
- this.io = io;
- this.encryptionManager = encryptionManager;
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
@@ -513,17 +502,22 @@ class SparkWrite {
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
- OutputFileFactory fileFactory = new OutputFileFactory(
- spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
- SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+ Table table = tableBroadcast.value();
+
+ OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+ SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+ PartitionSpec spec = table.spec();
+ FileIO io = table.io();
+
if (spec.isUnpartitioned()) {
- return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+ return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
} else if (partitionedFanoutEnabled) {
return new PartitionedFanout3Writer(
- spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema);
+ spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
} else {
return new Partitioned3Writer(
- spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema);
+ spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
}
}
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 7530afd..19debd5 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -23,18 +23,14 @@ import java.util.Locale;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.write.BatchWrite;
@@ -62,9 +58,6 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
private SparkMergeScan mergeScan = null;
private IsolationLevel isolationLevel = null;
- // lazy variables
- private JavaSparkContext lazySparkContext = null;
-
SparkWriteBuilder(SparkSession spark, Table table, LogicalWriteInfo info) {
this.spark = spark;
this.table = table;
@@ -75,13 +68,6 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null;
}
- private JavaSparkContext lazySparkContext() {
- if (lazySparkContext == null) {
- this.lazySparkContext = new JavaSparkContext(spark.sparkContext());
- }
- return lazySparkContext;
- }
-
public WriteBuilder overwriteFiles(Scan scan, IsolationLevel writeIsolationLevel) {
Preconditions.checkArgument(scan instanceof SparkMergeScan, "%s is not SparkMergeScan", scan);
Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual files and by filter");
@@ -128,10 +114,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
// Get write-audit-publish id
String wapId = spark.conf().get("spark.wap.id", null);
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
- SparkWrite write = new SparkWrite(table, io, encryptionManager, writeInfo, appId, wapId, writeSchema, dsSchema);
+ SparkWrite write = new SparkWrite(spark, table, writeInfo, appId, wapId, writeSchema, dsSchema);
if (overwriteByFilter) {
return write.asOverwriteByFilter(overwriteExpr);
} else if (overwriteDynamic) {
@@ -163,10 +146,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
// Get write-audit-publish id
String wapId = spark.conf().get("spark.wap.id", null);
- Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
- Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
- SparkWrite write = new SparkWrite(table, io, encryptionManager, writeInfo, appId, wapId, writeSchema, dsSchema);
+ SparkWrite write = new SparkWrite(spark, table, writeInfo, appId, wapId, writeSchema, dsSchema);
if (overwriteByFilter) {
return write.asStreamingOverwrite();
} else {