You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/20 06:33:05 UTC

[iceberg] branch master updated: Spark: Pass Table to executors (#2362)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a79de57  Spark: Pass Table to executors (#2362)
a79de57 is described below

commit a79de571860a290f6e96ac562d616c9c6be2071e
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 19 23:32:55 2021 -0700

    Spark: Pass Table to executors (#2362)
---
 .../org/apache/iceberg/io/OutputFileFactory.java   | 12 ++++
 .../iceberg/actions/RewriteDataFilesAction.java    |  8 +--
 .../iceberg/spark/source/BatchDataReader.java      | 12 ++--
 .../spark/source/EqualityDeleteRowReader.java      |  8 +--
 .../apache/iceberg/spark/source/RowDataReader.java | 16 ++---
 .../iceberg/spark/source/RowDataRewriter.java      | 40 ++++-------
 .../iceberg/spark/source/SparkAppenderFactory.java |  6 ++
 .../spark/source/TestSparkReaderDeletes.java       |  4 +-
 .../apache/iceberg/spark/source/IcebergSource.java | 27 +------
 .../org/apache/iceberg/spark/source/Reader.java    | 84 ++++++++--------------
 .../iceberg/spark/source/StreamingWriter.java      | 11 ++-
 .../org/apache/iceberg/spark/source/Writer.java    | 63 +++++++---------
 .../iceberg/spark/source/SparkBatchQueryScan.java  | 11 ++-
 .../iceberg/spark/source/SparkBatchScan.java       | 65 ++++++-----------
 .../iceberg/spark/source/SparkMergeScan.java       | 11 ++-
 .../iceberg/spark/source/SparkScanBuilder.java     | 25 +------
 .../apache/iceberg/spark/source/SparkWrite.java    | 56 +++++++--------
 .../iceberg/spark/source/SparkWriteBuilder.java    | 24 +------
 18 files changed, 171 insertions(+), 312 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
index 72d4a30..c510890 100644
--- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
+++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
 
@@ -44,6 +45,17 @@ public class OutputFileFactory {
   private final String operationId;
   private final AtomicInteger fileCount = new AtomicInteger(0);
 
+  // TODO: expose a builder like OutputFileFactory.forTable()
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {
+    this(table.spec(), format, table.locationProvider(), table.io(), table.encryption(),
+        partitionId, taskId, UUID.randomUUID().toString());
+  }
+
+  public OutputFileFactory(Table table, PartitionSpec spec, FileFormat format, int partitionId, long taskId) {
+    this(spec, format, table.locationProvider(), table.io(), table.encryption(),
+        partitionId, taskId, UUID.randomUUID().toString());
+  }
+
   /**
    * Constructor where a generated UUID is used as the operationId to ensure uniqueness.
    * @param spec Partition specification used by the location provider
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index 7e6a817..735e719 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -22,8 +22,8 @@ package org.apache.iceberg.actions;
 import java.util.List;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.source.RowDataRewriter;
@@ -59,10 +59,8 @@ public class RewriteDataFilesAction
   @Override
   protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
-    RowDataRewriter rowDataRewriter =
-        new RowDataRewriter(table(), spec(), caseSensitive(), io, encryption);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());
     return rowDataRewriter.rewriteDataForTasks(taskRDD);
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index d48cf24..8cfe46b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -28,10 +28,10 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
@@ -51,12 +51,10 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
   private final boolean caseSensitive;
   private final int batchSize;
 
-  BatchDataReader(
-      CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
-      EncryptionManager encryptionManager, boolean caseSensitive, int size) {
-    super(task, fileIo, encryptionManager);
+  BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
+    super(task, table.io(), table.encryption());
     this.expectedSchema = expectedSchema;
-    this.nameMapping = nameMapping;
+    this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
     this.caseSensitive = caseSensitive;
     this.batchSize = size;
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index a85d85d..d4328ad 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -24,9 +24,8 @@ import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,9 +33,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
 public class EqualityDeleteRowReader extends RowDataReader {
   private final Schema expectedSchema;
 
-  public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
-                                 FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
-    super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive);
+  public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+    super(task, table, table.schema(), caseSensitive);
     this.expectedSchema = expectedSchema;
   }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 7185a24..6d4bf8e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,13 +28,13 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
@@ -63,20 +63,16 @@ class RowDataReader extends BaseDataReader<InternalRow> {
       .impl(UnsafeProjection.class, InternalRow.class)
       .build();
 
-  private final FileIO io;
   private final Schema tableSchema;
   private final Schema expectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
 
-  RowDataReader(
-      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
-      EncryptionManager encryptionManager, boolean caseSensitive) {
-    super(task, io, encryptionManager);
-    this.io = io;
-    this.tableSchema = tableSchema;
+  RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+    super(task, table.io(), table.encryption());
+    this.tableSchema = table.schema();
     this.expectedSchema = expectedSchema;
-    this.nameMapping = nameMapping;
+    this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
     this.caseSensitive = caseSensitive;
   }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 07c7e00..0342c22 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -32,9 +32,6 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
@@ -49,34 +46,21 @@ import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
 
-  private final Schema schema;
+  private final Broadcast<Table> tableBroadcast;
   private final PartitionSpec spec;
-  private final Map<String, String> properties;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
-  private final LocationProvider locations;
-  private final String nameMapping;
   private final boolean caseSensitive;
 
-  public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
-                         Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
-    this.schema = table.schema();
+  public RowDataRewriter(Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean caseSensitive) {
+    this.tableBroadcast = tableBroadcast;
     this.spec = spec;
-    this.locations = table.locationProvider();
-    this.properties = table.properties();
-    this.io = io;
-    this.encryptionManager = encryptionManager;
-
     this.caseSensitive = caseSensitive;
-    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
+    Table table = tableBroadcast.value();
     String formatString = table.properties().getOrDefault(
         TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
     this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
@@ -95,27 +79,29 @@ public class RowDataRewriter implements Serializable {
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
 
-    RowDataReader dataReader = new RowDataReader(
-        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+    Table table = tableBroadcast.value();
+    Schema schema = table.schema();
+    Map<String, String> properties = table.properties();
+
+    RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);
 
     StructType structType = SparkSchemaUtil.convert(schema);
     SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
-    OutputFileFactory fileFactory = new OutputFileFactory(
-        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+    OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format, partitionId, taskId);
 
     TaskWriter<InternalRow> writer;
     if (spec.isUnpartitioned()) {
-      writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(),
+      writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, table.io(),
           Long.MAX_VALUE);
     } else if (PropertyUtil.propertyAsBoolean(properties,
         TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
         TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
       writer = new SparkPartitionedFanoutWriter(
-          spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
+          spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema,
           structType);
     } else {
       writer = new SparkPartitionedWriter(
-          spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
+          spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema,
           structType);
     }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index 3fe7513..b97a007 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
@@ -60,6 +61,11 @@ class SparkAppenderFactory implements FileAppenderFactory<InternalRow> {
   private StructType eqDeleteSparkType = null;
   private StructType posDeleteSparkType = null;
 
+  // TODO: expose a builder like SparkAppenderFactory.forTable()
+  SparkAppenderFactory(Table table, Schema writeSchema, StructType dsSchema) {
+    this(table.properties(), writeSchema, dsSchema, table.spec());
+  }
+
   SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
     this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null);
   }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 75b37c7..f6821d0 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -61,7 +61,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 
 public abstract class TestSparkReaderDeletes extends DeleteReadTests {
 
@@ -206,8 +205,7 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests {
         TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
 
     for (CombinedScanTask task : tasks) {
-      try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(),
-          table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) {
+      try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) {
         while (reader.next()) {
           actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));
         }
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 9bf9d15c..9b86e00 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -24,16 +24,12 @@ import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.execution.streaming.StreamExecution;
@@ -52,7 +48,6 @@ import org.apache.spark.sql.types.StructType;
 public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
 
   private SparkSession lazySpark = null;
-  private JavaSparkContext lazySparkContext = null;
   private Configuration lazyConf = null;
 
   @Override
@@ -71,10 +66,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive");
 
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
-    Reader reader = new Reader(table, io, encryptionManager, Boolean.parseBoolean(caseSensitive), options);
+    Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
     if (readSchema != null) {
       // convert() will fail if readSchema contains fields not in table.schema()
       SparkSchemaUtil.convert(table.schema(), readSchema);
@@ -98,11 +90,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     String wapId = lazySparkSession().conf().get("spark.wap.id", null);
     boolean replacePartitions = mode == SaveMode.Overwrite;
 
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
     return Optional.of(new Writer(
-        table, io, encryptionManager, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
+        lazySparkSession(), table, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
   }
 
   @Override
@@ -121,10 +110,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
     String appId = lazySparkSession().sparkContext().applicationId();
 
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
-    return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct);
+    return new StreamingWriter(lazySparkSession(), table, options, queryId, mode, appId, writeSchema, dsStruct);
   }
 
   protected Table findTable(DataSourceOptions options, Configuration conf) {
@@ -146,13 +132,6 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     return lazySpark;
   }
 
-  private JavaSparkContext lazySparkContext() {
-    if (lazySparkContext == null) {
-      this.lazySparkContext = new JavaSparkContext(lazySparkSession().sparkContext());
-    }
-    return lazySparkContext;
-  }
-
   private Configuration lazyBaseConf() {
     if (lazyConf == null) {
       this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index aa93f4c..45a13f2 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -32,11 +32,11 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
@@ -44,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -53,6 +52,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
@@ -72,8 +72,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
     SupportsPushDownRequiredColumns, SupportsReportStatistics {
   private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
@@ -81,6 +79,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   private static final Filter[] NO_FILTERS = new Filter[0];
   private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
 
+  private final JavaSparkContext sparkContext;
   private final Table table;
   private final DataSourceOptions options;
   private final Long snapshotId;
@@ -90,8 +89,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   private final Long splitSize;
   private final Integer splitLookback;
   private final Long splitOpenFileCost;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
   private final boolean caseSensitive;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
@@ -105,8 +102,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Boolean readUsingBatch = null;
 
-  Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-      boolean caseSensitive, DataSourceOptions options) {
+  Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.options = options;
     this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
@@ -135,7 +132,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
     this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
 
-    if (io.getValue() instanceof HadoopFileIO) {
+    if (table.io() instanceof HadoopFileIO) {
       String fsscheme = "no_exist";
       try {
         Configuration conf = SparkSession.active().sessionState().newHadoopConf();
@@ -156,8 +153,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     }
 
     this.schema = table.schema();
-    this.io = io;
-    this.encryptionManager = encryptionManager;
     this.caseSensitive = caseSensitive;
     this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() ->
         PropertyUtil.propertyAsInt(table.properties(),
@@ -202,17 +197,18 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
     Preconditions.checkState(enableBatchRead(), "Batched reads not enabled");
     Preconditions.checkState(batchSize > 0, "Invalid batch size");
-    String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
-    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
 
     ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
         "Cannot scan table %s: cannot apply required delete files", table);
 
+    // broadcast the table metadata as input partitions will be sent to executors
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask<>(
-          task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
+          task, tableBroadcast, expectedSchemaString, caseSensitive,
           localityPreferred, new BatchReaderFactory(batchSize)));
     }
     LOG.info("Batching input partitions with {} tasks.", readTasks.size());
@@ -225,14 +221,15 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
    */
   @Override
   public List<InputPartition<InternalRow>> planInputPartitions() {
-    String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
-    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    // broadcast the table metadata as input partitions will be sent to executors
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask<>(
-          task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
+          task, tableBroadcast, expectedSchemaString, caseSensitive,
           localityPreferred, InternalRowReaderFactory.INSTANCE));
     }
 
@@ -443,38 +440,30 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
 
   private static class ReadTask<T> implements Serializable, InputPartition<T> {
     private final CombinedScanTask task;
-    private final String tableSchemaString;
+    private final Broadcast<Table> tableBroadcast;
     private final String expectedSchemaString;
-    private final String nameMappingString;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
     private final boolean localityPreferred;
     private final ReaderFactory<T> readerFactory;
 
-    private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
     private transient String[] preferredLocations = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+    private ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
                      boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
       this.task = task;
-      this.tableSchemaString = tableSchemaString;
+      this.tableBroadcast = tableBroadcast;
       this.expectedSchemaString = expectedSchemaString;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
       this.readerFactory = readerFactory;
-      this.nameMappingString = nameMappingString;
     }
 
     @Override
     public InputPartitionReader<T> createPartitionReader() {
-      return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
-          encryptionManager.value(), caseSensitive);
+      Table table = tableBroadcast.value();
+      return readerFactory.create(task, table, lazyExpectedSchema(), caseSensitive);
     }
 
     @Override
@@ -482,13 +471,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
       return preferredLocations;
     }
 
-    private Schema lazyTableSchema() {
-      if (tableSchema == null) {
-        this.tableSchema = SchemaParser.fromJson(tableSchemaString);
-      }
-      return tableSchema;
-    }
-
     private Schema lazyExpectedSchema() {
       if (expectedSchema == null) {
         this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
@@ -508,9 +490,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   }
 
   private interface ReaderFactory<T> extends Serializable {
-    InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
-                                   String nameMapping, FileIO io,
-                                   EncryptionManager encryptionManager, boolean caseSensitive);
+    InputPartitionReader<T> create(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive);
   }
 
   private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
@@ -520,10 +500,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     }
 
     @Override
-    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
-                                                    String nameMapping, FileIO io,
-                                                    EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Table table,
+                                                    Schema expectedSchema, boolean caseSensitive) {
+      return new RowReader(task, table, expectedSchema, caseSensitive);
     }
   }
 
@@ -535,24 +514,21 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     }
 
     @Override
-    public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
-                                                      String nameMapping, FileIO io,
-                                                      EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
+    public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Table table,
+                                                      Schema expectedSchema, boolean caseSensitive) {
+      return new BatchReader(task, table, expectedSchema, caseSensitive, batchSize);
     }
   }
 
   private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
-    RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
-              EncryptionManager encryptionManager, boolean caseSensitive) {
-      super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+    RowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
+      super(task, table, expectedSchema, caseSensitive);
     }
   }
 
   private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
-    BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
-                       EncryptionManager encryptionManager, boolean caseSensitive, int size) {
-      super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
+    BatchReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
+      super(task, table, expectedSchema, caseSensitive, size);
     }
   }
 }
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
index f3a1a40..8ca33b1 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -27,10 +27,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
@@ -48,10 +46,9 @@ public class StreamingWriter extends Writer implements StreamWriter {
   private final String queryId;
   private final OutputMode mode;
 
-  StreamingWriter(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-                  DataSourceOptions options, String queryId, OutputMode mode, String applicationId,
-                  Schema writeSchema, StructType dsSchema) {
-    super(table, io, encryptionManager, options, false, applicationId, writeSchema, dsSchema);
+  StreamingWriter(SparkSession spark, Table table, DataSourceOptions options, String queryId,
+                  OutputMode mode, String applicationId, Schema writeSchema, StructType dsSchema) {
+    super(spark, table, options, false, applicationId, writeSchema, dsSchema);
     this.queryId = queryId;
     this.mode = mode;
   }
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index f54eae7..76455b4 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -30,13 +30,12 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -45,7 +44,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
@@ -75,10 +76,9 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 class Writer implements DataSourceWriter {
   private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
+  private final JavaSparkContext sparkContext;
   private final Table table;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
   private final boolean replacePartitions;
   private final String applicationId;
   private final String wapId;
@@ -88,19 +88,16 @@ class Writer implements DataSourceWriter {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
-  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-         DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
-         StructType dsSchema) {
-    this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
+  Writer(SparkSession spark, Table table, DataSourceOptions options, boolean replacePartitions,
+         String applicationId, Schema writeSchema, StructType dsSchema) {
+    this(spark, table, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
   }
 
-  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-         DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
-         Schema writeSchema, StructType dsSchema) {
+  Writer(SparkSession spark, Table table, DataSourceOptions options, boolean replacePartitions,
+         String applicationId, String wapId, Schema writeSchema, StructType dsSchema) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.format = getFileFormat(table.properties(), options);
-    this.io = io;
-    this.encryptionManager = encryptionManager;
     this.replacePartitions = replacePartitions;
     this.applicationId = applicationId;
     this.wapId = wapId;
@@ -137,9 +134,9 @@ class Writer implements DataSourceWriter {
 
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema, partitionedFanoutEnabled);
+    // broadcast the table metadata as the writer factory will be sent to executors
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+    return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
   }
 
   @Override
@@ -210,7 +207,7 @@ class Writer implements DataSourceWriter {
             2.0 /* exponential */)
         .throwFailureWhenFinished()
         .run(file -> {
-          io.value().deleteFile(file.path().toString());
+          table.io().deleteFile(file.path().toString());
         });
   }
 
@@ -245,27 +242,17 @@ class Writer implements DataSourceWriter {
   }
 
   static class WriterFactory implements DataWriterFactory<InternalRow> {
-    private final PartitionSpec spec;
+    private final Broadcast<Table> tableBroadcast;
     private final FileFormat format;
-    private final LocationProvider locations;
-    private final Map<String, String> properties;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final long targetFileSize;
     private final Schema writeSchema;
     private final StructType dsSchema;
     private final boolean partitionedFanoutEnabled;
 
-    WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                  Map<String, String> properties, Broadcast<FileIO> io,
-                  Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+    WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
                   Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
-      this.spec = spec;
+      this.tableBroadcast = tableBroadcast;
       this.format = format;
-      this.locations = locations;
-      this.properties = properties;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.targetFileSize = targetFileSize;
       this.writeSchema = writeSchema;
       this.dsSchema = dsSchema;
@@ -274,17 +261,21 @@ class Writer implements DataSourceWriter {
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
 
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+        return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
       } else if (partitionedFanoutEnabled) {
-        return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
+        return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize,
             writeSchema, dsSchema);
       } else {
-        return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
+        return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize,
             writeSchema, dsSchema);
       }
     }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 76d873e..704a771 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -27,15 +27,13 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 class SparkBatchQueryScan extends SparkBatchScan {
@@ -50,11 +48,10 @@ class SparkBatchQueryScan extends SparkBatchScan {
 
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  SparkBatchQueryScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
-                      boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
-                      CaseInsensitiveStringMap options) {
+  SparkBatchQueryScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
+                      List<Expression> filters, CaseInsensitiveStringMap options) {
 
-    super(table, io, encryption, caseSensitive, expectedSchema, filters, options);
+    super(spark, table, caseSensitive, expectedSchema, filters, options);
 
     this.snapshotId = Spark3Util.propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID, null);
     this.asOfTimestamp = Spark3Util.propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP, null);
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index e26f8c6..1999b56 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -30,18 +30,17 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
@@ -62,29 +61,26 @@ import org.slf4j.LoggerFactory;
 abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
   private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class);
 
+  private final JavaSparkContext sparkContext;
   private final Table table;
   private final boolean caseSensitive;
   private final boolean localityPreferred;
   private final Schema expectedSchema;
   private final List<Expression> filterExpressions;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
   private final int batchSize;
   private final CaseInsensitiveStringMap options;
 
   // lazy variables
   private StructType readSchema = null;
 
-  SparkBatchScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
-                 boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
-                 CaseInsensitiveStringMap options) {
+  SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
+                 List<Expression> filters, CaseInsensitiveStringMap options) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
-    this.io = io;
-    this.encryptionManager = encryption;
     this.caseSensitive = caseSensitive;
     this.expectedSchema = expectedSchema;
     this.filterExpressions = filters != null ? filters : Collections.emptyList();
-    this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options);
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
     this.batchSize = Spark3Util.batchSize(table.properties(), options);
     this.options = options;
   }
@@ -122,15 +118,16 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
 
   @Override
   public InputPartition[] planInputPartitions() {
-    String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(expectedSchema);
-    String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+
+    // broadcast the table metadata as input partitions will be sent to executors
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<CombinedScanTask> scanTasks = tasks();
     InputPartition[] readTasks = new InputPartition[scanTasks.size()];
     for (int i = 0; i < scanTasks.size(); i++) {
       readTasks[i] = new ReadTask(
-          scanTasks.get(i), tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager,
+          scanTasks.get(i), tableBroadcast, expectedSchemaString,
           caseSensitive, localityPreferred);
     }
 
@@ -249,43 +246,34 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
 
   private static class RowReader extends RowDataReader implements PartitionReader<InternalRow> {
     RowReader(ReadTask task) {
-      super(task.task, task.tableSchema(), task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(),
-          task.isCaseSensitive());
+      super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive());
     }
   }
 
   private static class BatchReader extends BatchDataReader implements PartitionReader<ColumnarBatch> {
     BatchReader(ReadTask task, int batchSize) {
-      super(task.task, task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(),
-          task.isCaseSensitive(), batchSize);
+      super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize);
     }
   }
 
   private static class ReadTask implements InputPartition, Serializable {
     private final CombinedScanTask task;
-    private final String tableSchemaString;
+    private final Broadcast<Table> tableBroadcast;
     private final String expectedSchemaString;
-    private final String nameMappingString;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
 
-    private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
     private transient String[] preferredLocations = null;
 
-    ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, String nameMappingString,
-             Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager, boolean caseSensitive,
-             boolean localityPreferred) {
+    ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
+             boolean caseSensitive, boolean localityPreferred) {
       this.task = task;
-      this.tableSchemaString = tableSchemaString;
+      this.tableBroadcast = tableBroadcast;
       this.expectedSchemaString = expectedSchemaString;
-      this.nameMappingString = nameMappingString;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       if (localityPreferred) {
-        this.preferredLocations = Util.blockLocations(io.value(), task);
+        Table table = tableBroadcast.value();
+        this.preferredLocations = Util.blockLocations(table.io(), task);
       } else {
         this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
       }
@@ -300,25 +288,14 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
       return task.files();
     }
 
-    public FileIO io() {
-      return io.value();
-    }
-
-    public EncryptionManager encryption() {
-      return encryptionManager.value();
+    public Table table() {
+      return tableBroadcast.value();
     }
 
     public boolean isCaseSensitive() {
       return caseSensitive;
     }
 
-    private Schema tableSchema() {
-      if (tableSchema == null) {
-        this.tableSchema = SchemaParser.fromJson(tableSchemaString);
-      }
-      return tableSchema;
-    }
-
     private Schema expectedSchema() {
       if (expectedSchema == null) {
         this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
index ac2ee40..1042f19 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -32,18 +32,16 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.TableScanUtil;
-import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -70,11 +68,10 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Set<String> filteredLocations = null;
 
-  SparkMergeScan(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryption,
-                 boolean caseSensitive, boolean ignoreResiduals, Schema expectedSchema,
-                 List<Expression> filters, CaseInsensitiveStringMap options) {
+  SparkMergeScan(SparkSession spark, Table table, boolean caseSensitive, boolean ignoreResiduals,
+                 Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {
 
-    super(table, io, encryption, caseSensitive, expectedSchema, filters, options);
+    super(spark, table, caseSensitive, expectedSchema, filters, options);
 
     this.table = table;
     this.ignoreResiduals = ignoreResiduals;
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 5ab24a4..633a171 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -26,20 +26,15 @@ import java.util.stream.Stream;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -65,9 +60,6 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
   private Filter[] pushedFilters = NO_FILTERS;
   private boolean ignoreResiduals = false;
 
-  // lazy variables
-  private JavaSparkContext lazySparkContext = null;
-
   SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
     this.spark = spark;
     this.table = table;
@@ -75,13 +67,6 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
     this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
   }
 
-  private JavaSparkContext lazySparkContext() {
-    if (lazySparkContext == null) {
-      this.lazySparkContext = new JavaSparkContext(spark.sparkContext());
-    }
-    return lazySparkContext;
-  }
-
   private Schema lazySchema() {
     if (schema == null) {
       if (requestedProjection != null) {
@@ -174,19 +159,13 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
 
   @Override
   public Scan build() {
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryption = lazySparkContext().broadcast(table.encryption());
-
     return new SparkBatchQueryScan(
-        table, io, encryption, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options);
+        spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options);
   }
 
   public Scan buildMergeScan() {
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryption = lazySparkContext().broadcast(table.encryption());
-
     return new SparkMergeScan(
-        table, io, encryption, caseSensitive, ignoreResiduals,
+        spark, table, caseSensitive, ignoreResiduals,
         schemaWithMetadataColumns(), filterExpressions, options);
   }
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 2fc803f..3a43777 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -35,16 +35,15 @@ import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -53,7 +52,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.write.BatchWrite;
 import org.apache.spark.sql.connector.write.DataWriter;
@@ -87,11 +88,10 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 class SparkWrite {
   private static final Logger LOG = LoggerFactory.getLogger(SparkWrite.class);
 
+  private final JavaSparkContext sparkContext;
   private final Table table;
   private final String queryId;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
   private final String applicationId;
   private final String wapId;
   private final long targetFileSize;
@@ -100,14 +100,13 @@ class SparkWrite {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
-  SparkWrite(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-             LogicalWriteInfo writeInfo, String applicationId, String wapId,
+  SparkWrite(SparkSession spark, Table table, LogicalWriteInfo writeInfo,
+             String applicationId, String wapId,
              Schema writeSchema, StructType dsSchema) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.queryId = writeInfo.queryId();
     this.format = getFileFormat(table.properties(), writeInfo.options());
-    this.io = io;
-    this.encryptionManager = encryptionManager;
     this.applicationId = applicationId;
     this.wapId = wapId;
     this.writeSchema = writeSchema;
@@ -168,9 +167,9 @@ class SparkWrite {
 
   // the writer factory works for both batch and streaming
   private WriterFactory createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema, partitionedFanoutEnabled);
+    // broadcast the table metadata as the writer factory will be sent to executors
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+    return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
   }
 
   private void commitOperation(SnapshotUpdate<?> operation, String description) {
@@ -207,7 +206,7 @@ class SparkWrite {
             2.0 /* exponential */)
         .throwFailureWhenFinished()
         .run(file -> {
-          io.value().deleteFile(file.path().toString());
+          table.io().deleteFile(file.path().toString());
         });
   }
 
@@ -479,27 +478,17 @@ class SparkWrite {
   }
 
   private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
-    private final PartitionSpec spec;
+    private final Broadcast<Table> tableBroadcast;
     private final FileFormat format;
-    private final LocationProvider locations;
-    private final Map<String, String> properties;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final long targetFileSize;
     private final Schema writeSchema;
     private final StructType dsSchema;
     private final boolean partitionedFanoutEnabled;
 
-    protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                            Map<String, String> properties, Broadcast<FileIO> io,
-                            Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+    protected WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
                             Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
-      this.spec = spec;
+      this.tableBroadcast = tableBroadcast;
       this.format = format;
-      this.locations = locations;
-      this.properties = properties;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.targetFileSize = targetFileSize;
       this.writeSchema = writeSchema;
       this.dsSchema = dsSchema;
@@ -513,17 +502,22 @@ class SparkWrite {
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
       } else if (partitionedFanoutEnabled) {
         return new PartitionedFanout3Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema);
+            spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
       } else {
         return new Partitioned3Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema);
+            spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
       }
     }
   }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 7530afd..19debd5 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -23,18 +23,14 @@ import java.util.Locale;
 import org.apache.iceberg.IsolationLevel;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.write.BatchWrite;
@@ -62,9 +58,6 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
   private SparkMergeScan mergeScan = null;
   private IsolationLevel isolationLevel = null;
 
-  // lazy variables
-  private JavaSparkContext lazySparkContext = null;
-
   SparkWriteBuilder(SparkSession spark, Table table, LogicalWriteInfo info) {
     this.spark = spark;
     this.table = table;
@@ -75,13 +68,6 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
         options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null;
   }
 
-  private JavaSparkContext lazySparkContext() {
-    if (lazySparkContext == null) {
-      this.lazySparkContext = new JavaSparkContext(spark.sparkContext());
-    }
-    return lazySparkContext;
-  }
-
   public WriteBuilder overwriteFiles(Scan scan, IsolationLevel writeIsolationLevel) {
     Preconditions.checkArgument(scan instanceof SparkMergeScan, "%s is not SparkMergeScan", scan);
     Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual files and by filter");
@@ -128,10 +114,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
     // Get write-audit-publish id
     String wapId = spark.conf().get("spark.wap.id", null);
 
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
-    SparkWrite write = new SparkWrite(table, io, encryptionManager, writeInfo, appId, wapId, writeSchema, dsSchema);
+    SparkWrite write = new SparkWrite(spark, table, writeInfo, appId, wapId, writeSchema, dsSchema);
     if (overwriteByFilter) {
       return write.asOverwriteByFilter(overwriteExpr);
     } else if (overwriteDynamic) {
@@ -163,10 +146,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
     // Get write-audit-publish id
     String wapId = spark.conf().get("spark.wap.id", null);
 
-    Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
-    Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
-
-    SparkWrite write = new SparkWrite(table, io, encryptionManager, writeInfo, appId, wapId, writeSchema, dsSchema);
+    SparkWrite write = new SparkWrite(spark, table, writeInfo, appId, wapId, writeSchema, dsSchema);
     if (overwriteByFilter) {
       return write.asStreamingOverwrite();
     } else {