You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/07 09:57:47 UTC

[GitHub] [iceberg] szehon-ho commented on a change in pull request #2362: Spark: Pass Table object to executors

szehon-ho commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608506448



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -49,34 +46,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
 
-  private final Schema schema;
+  private final Broadcast<Table> tableBroadcast;
   private final PartitionSpec spec;
-  private final Map<String, String> properties;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
-  private final LocationProvider locations;
-  private final String nameMapping;
   private final boolean caseSensitive;
 
-  public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
-                         Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
-    this.schema = table.schema();
+  public RowDataRewriter(Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean caseSensitive) {
+    this.tableBroadcast = tableBroadcast;
     this.spec = spec;
-    this.locations = table.locationProvider();
-    this.properties = table.properties();
-    this.io = io;
-    this.encryptionManager = encryptionManager;
-
     this.caseSensitive = caseSensitive;
-    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
+    Table table = tableBroadcast.getValue();

Review comment:
       We should use value(), right?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -95,27 +79,29 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
 
-    RowDataReader dataReader = new RowDataReader(
-        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+    Table table = tableBroadcast.getValue();

Review comment:
       Same comment

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       I guess in future we can simplify these writer constructors too with table?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -443,52 +448,37 @@ public String toString() {
 
   private static class ReadTask<T> implements Serializable, InputPartition<T> {
     private final CombinedScanTask task;
-    private final String tableSchemaString;
+    private final Broadcast<Table> tableBroadcast;
     private final String expectedSchemaString;
-    private final String nameMappingString;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
     private final boolean localityPreferred;
     private final ReaderFactory<T> readerFactory;
 
-    private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
     private transient String[] preferredLocations = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+    private ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
                      boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
       this.task = task;
-      this.tableSchemaString = tableSchemaString;
+      this.tableBroadcast = tableBroadcast;
       this.expectedSchemaString = expectedSchemaString;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
       this.readerFactory = readerFactory;
-      this.nameMappingString = nameMappingString;
     }
 
     @Override
     public InputPartitionReader<T> createPartitionReader() {
-      return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
-          encryptionManager.value(), caseSensitive);
+      Table table = tableBroadcast.getValue();

Review comment:
       Same

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       Out of curiosity, why do we need a lazy one?
   
   And also what the java one provide?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -274,17 +271,21 @@ public String toString() {
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();

Review comment:
       Same (value()?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org