You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/24 00:08:27 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #2362: Spark: Pass Table object to executors

aokolnychyi opened a new pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362


   This WIP PR is an attempt to pass around `Table` object instead of individual things like spec, sort order, etc.
   
   Originally, we used to separately pass `FileIO`, `LocationProvider`, properties as our `Table` objects were not serializable. We have to modify a lot of places each time we need add one more item to pass (e.g. sort order). That’s why passing `Table` instances to executors should simplify the configuration and make APIs easier.
   
   Since recently, `BaseTable` is serializable but the current way is not sufficient for two reasons:
   - We don’t handle Kryo serialization.
   - It is not a good idea to read the metadata file from each executor.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615078266



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       Let me take care of that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-805671974


   CC: @lcspinter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601217064



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {

Review comment:
       We have existing tests here for table serialization: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
   
   Maybe some refactor, and reuse?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi closed pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-805379292


   Could you take a look too, @openinx?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615089472



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -51,12 +51,10 @@
   private final boolean caseSensitive;
   private final int batchSize;
 
-  BatchDataReader(
-      CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
-      EncryptionManager encryptionManager, boolean caseSensitive, int size) {
-    super(task, fileIo, encryptionManager);
+  BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema,  boolean caseSensitive, int size) {

Review comment:
       A nit: extra space before `boolean`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600063839



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       Missed this comment. Yeah, this idea occurred to me too. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601245730



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  // needed for serialization
+  public SerializedTable(String name, String location, String metadataFileLocation,
+                         Map<String, String> properties, Schema schema,
+                         int defaultSpecId, Map<Integer, PartitionSpec> specs,
+                         int defaultSortOrderId, Map<Integer, SortOrder> sortOrders,
+                         FileIO io, EncryptionManager encryption, LocationProvider locationProvider) {
+    this.name = name;
+    this.location = location;
+    this.metadataFileLocation = metadataFileLocation;
+    this.properties = properties;
+    this.schema = schema;
+    this.defaultSpecId = defaultSpecId;
+    this.specs = specs;
+    this.defaultSortOrderId = defaultSortOrderId;
+    this.sortOrders = sortOrders;
+    this.io = io;
+    this.encryption = encryption;
+    this.locationProvider = locationProvider;
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    throw new UnsupportedOperationException(errorMsg("currentSnapshot"));
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    throw new UnsupportedOperationException(errorMsg("snapshot"));
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    throw new UnsupportedOperationException(errorMsg("snapshots"));
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    throw new UnsupportedOperationException(errorMsg("history"));
+  }

Review comment:
       Not sure about this, but in the future it might be useful to implement these to be able to access the previous history/snapshots of the static Table.
   
   Use-case: We have 2 Iceberg tables and would like to have the valid snapshot combinations for the past few days. If we need to serialize both tables and then do the job on a different worker node then we need to read the history for the serialized tables.
   
   Any particular reason not to implement these as delegating them to `lazyTable()`?
   
   Thanks,
   Peter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616221285



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -59,10 +59,8 @@ protected FileIO fileIO() {
   @Override
   protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
-    RowDataRewriter rowDataRewriter =
-        new RowDataRewriter(table(), spec(), caseSensitive(), io, encryption);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());

Review comment:
       Why pass through spec()? Can this be customized by this class? Shouldn't it be forced to be the table.spec()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600063478



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?

Review comment:
       I like it. We could probably leverage it in the `BaseTable` proxy as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600063277



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<Integer, PartitionSpec> specs;
+  private final SortOrder sortOrder;
+  private final Map<Integer, SortOrder> sortOrders;
+  private FileIO io;

Review comment:
       No, can be just `io` for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608050620



##########
File path: core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
##########
@@ -44,6 +45,17 @@
   private final String operationId;
   private final AtomicInteger fileCount = new AtomicInteger(0);
 
+  // TODO: expose a builder like OutputFileFactory.forTable()
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {

Review comment:
       All constructors should be replaced with a builder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600980695



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -168,9 +177,9 @@ private boolean isWapTable() {
 
   // the writer factory works for both batch and streaming
   private WriterFactory createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema, partitionedFanoutEnabled);
+    // broadcast the table metadata as the writer factory will be sent to executors
+    Broadcast<Table> staticTable = lazySparkContext().broadcast(SparkUtil.toStaticTable(table));

Review comment:
       @pvary, is there a broadcast concept in Hive too? We had some performance issues serializing/deserializing FileIO.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -168,9 +177,9 @@ private boolean isWapTable() {
 
   // the writer factory works for both batch and streaming
   private WriterFactory createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema, partitionedFanoutEnabled);
+    // broadcast the table metadata as the writer factory will be sent to executors
+    Broadcast<Table> staticTable = lazySparkContext().broadcast(SparkUtil.toStaticTable(table));

Review comment:
       @pvary, is there a broadcast concept in Hive too? We had some performance issues deserializing FileIO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-809904718


   This is ready for a detailed review round, @pvary @rdblue @szehon-ho @RussellSpitzer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615051850



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       Minor suggestion: maybe just refactor the one inside this file. Not touch the parent classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600061493



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       Can we use this table implementation for the other case? It would be nice to have the proxy produce this class on `readResolve`. That way we only have one implementation. Spark would need to convert explicitly, but Java serialization would just work in other circumstances.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603777628



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testTableJavaSerialization() throws IOException, ClassNotFoundException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+      out.writeObject(table);
+    }
+
+    try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+      Object obj = in.readObject();
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testSerializedTableKryoSerialization() throws IOException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+    Table serializedTable = SparkUtil.toSerializedTable(table);
+    checkTable(table, serializedTable);
+
+    try (Output out = new Output(new FileOutputStream(data))) {
+      kryo.writeClassAndObject(out, serializedTable);
+    }
+
+    try (Input in = new Input(new FileInputStream(data))) {
+      Object obj = kryo.readClassAndObject(in);
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testSerializedTableJavaSerialization() throws IOException, ClassNotFoundException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    Table serializedTable = SparkUtil.toSerializedTable(table);
+    checkTable(table, serializedTable);
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+      out.writeObject(serializedTable);
+    }
+
+    try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+      Object obj = in.readObject();
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testMetadataTableJavaSerialization() throws IOException, ClassNotFoundException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+        out.writeObject(metadataTable);
+      }
+
+      try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+        Object obj = in.readObject();
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  @Test
+  public void testSerializedMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      Table serializedMetadataTable = SparkUtil.toSerializedTable(metadataTable);
+      checkTable(metadataTable, serializedMetadataTable);
+
+      try (Output out = new Output(new FileOutputStream(data))) {
+        kryo.writeClassAndObject(out, serializedMetadataTable);
+      }
+
+      try (Input in = new Input(new FileInputStream(data))) {
+        Object obj = kryo.readClassAndObject(in);
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  @Test
+  public void testSerializedMetadataTableJavaSerialization() throws IOException, ClassNotFoundException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      Table serializedMetadataTable = SparkUtil.toSerializedTable(metadataTable);
+      checkTable(metadataTable, serializedMetadataTable);
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+        out.writeObject(serializedMetadataTable);
+      }
+
+      try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+        Object obj = in.readObject();
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  private void checkTable(Table expected, Table actual) {
+    Assert.assertEquals("Name must match", expected.name(), actual.name());
+    Assert.assertEquals("Location must match", expected.location(), actual.location());
+    Assert.assertEquals("Props must match", expected.properties(), actual.properties());
+    Assert.assertEquals("Schema must match", expected.schema().asStruct(), actual.schema().asStruct());

Review comment:
       This has been fixed. Resolving.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603776797



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       Need more eyes on this. The way we currently compare structs breaks `Schema` and `PartitionSpec` Kryo  serialization. We rely on a proxy in `writeReplace` but Kryo ignores that.
   
   If we decide to go this way, I can submit this in a separate PR if we want to handle it separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604426221



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       Yeah, that sounds like a good option. I think we should probably do both. There isn't too much overhead to `equals` so we should start ensuring we correctly use it all the time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608853494



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -274,17 +271,21 @@ public String toString() {
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603774317



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {

Review comment:
       The new tests actually require Spark to test Kryo. I also use `SparkUtil` in some places to construct serialized tables. The existing tests verify job planning which is not something I cover. It seems fine to have both but I can also try refactoring it. Let me know what you think, @pvary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-820956752


   @rdblue @yyanyy @RussellSpitzer @flyrain @karuppayya, could you do one more review round?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608860743



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       The Java one is a bit more friendly for Java. For example, here is how `broadcast` is defined:
   
   ```
   def broadcast[T: ClassTag](value: T): Broadcast[T] // Scala
   def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)(fakeClassTag) // Java
   ```
   
   We would need to pass a class tag ourselves if we used the Scala one.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600971207



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       Yeah, we are fine in Spark. I just want to make sure that does not happen in Hive too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-821526734


   > > schema, partitionSpec, and partition Id part of table as well
   > 
   > We used to derive this information from the table object and pass individual things as the table wasn't serializable. Each time we needed one more entity, we had to modify a lot of places. Hopefully, this won't be needed after this change.
   
   I meant this place, but it makes sense since rewrite may need a different partitionSec, sorry for the confusing.
   ```
   public OutputFileFactory(Table table, PartitionSpec spec, FileFormat format, int partitionId, long taskId) {
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616342162



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -59,10 +59,8 @@ protected FileIO fileIO() {
   @Override
   protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
-    RowDataRewriter rowDataRewriter =
-        new RowDataRewriter(table(), spec(), caseSensitive(), io, encryption);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());

Review comment:
       Users can override the output spec by calling `outputSpecId`. While we default the output spec to the current spec, no guarantee it is the one used to write the data back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608059398



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);

Review comment:
       Resolved in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-806036066


   Should we just make a configuration container class? I'm not sure I like that it has the exact same api but runtime limits ... I haven't thought about this a lot though
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603772820



##########
File path: core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
##########
@@ -59,6 +60,11 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider
     this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString());
   }
 
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {

Review comment:
       This particular call also passes partition id. I think `OutputFileFactory` should have a builder. I added a TODO item and would propose to refactor this part once we switch to the builder approach.
   
   How does that sound, @pvary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603860494



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {

Review comment:
       I think we can keep both, but we might want to rename them so it is easier to differentiate the tested use-cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616222642



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -105,8 +102,8 @@
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Boolean readUsingBatch = null;
 
-  Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-      boolean caseSensitive, DataSourceOptions options) {
+  Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());

Review comment:
       This is valid, but it raises my heart rate. I would prefer we use JavaSparkContext.fromSparkContext so when I see this I don't think we are making a brand new context :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615092739



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -51,12 +51,10 @@
   private final boolean caseSensitive;
   private final int batchSize;
 
-  BatchDataReader(
-      CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
-      EncryptionManager encryptionManager, boolean caseSensitive, int size) {
-    super(task, fileIo, encryptionManager);
+  BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema,  boolean caseSensitive, int size) {

Review comment:
       Good catch, fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615052745



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -479,27 +488,17 @@ public void doCommit(long epochId, WriterCommitMessage[] messages) {
   }
 
   private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
-    private final PartitionSpec spec;
+    private final Broadcast<Table> tableBroadcast;
     private final FileFormat format;
-    private final LocationProvider locations;
-    private final Map<String, String> properties;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final long targetFileSize;
     private final Schema writeSchema;
     private final StructType dsSchema;
     private final boolean partitionedFanoutEnabled;
 
-    protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                            Map<String, String> properties, Broadcast<FileIO> io,
-                            Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+    protected WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
                             Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
-      this.spec = spec;
+      this.tableBroadcast = tableBroadcast;

Review comment:
       Just out of curiosity, looks like we broadcast more data with the change. So the perf impact would be minor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603776797



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       Need more eyes on this. The way we currently compare structs breaks `Schema` and `PartitionSpec` Kryo  serialization. We rely on a proxy in `writeReplace` but Kryo ignores that.
   
   If we decide to go this way, I can submit this change in a separate PR if we want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616217832



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -59,10 +59,8 @@ protected FileIO fileIO() {
   @Override
   protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
-    RowDataRewriter rowDataRewriter =
-        new RowDataRewriter(table(), spec(), caseSensitive(), io, encryption);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());

Review comment:
       @flyrain  This is what I was mentioning today, so in the future you may be able to just extract the properties from the tableBroadcast




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604318145



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       Okay, so Kryo bypasses the singleton logic that we have in the proxy. Is there another way around it? I'm concerned about places that don't even call `equals` and assume that identity is enough, like `someType == LongType.get()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604325611



##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -34,7 +34,7 @@
  * using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
  * deserialization.
  */
-abstract class BaseMetadataTable implements Table, Serializable {
+public abstract class BaseMetadataTable implements Table, Serializable {

Review comment:
       @aokolnychyi, instead of doing this, what about adding a util class in this package that can access the package-private classes? Then we'd be able to move the `SparkUtil.toSerializedTable` into core so it can be shared by Spark and Flink, and we wouldn't need to open up access.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608861867



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       W.r.t. lazy, I just copied the old behavior that probably saves an instance creation in case if don't get to broadcasting and fail early. It is probably be alright to always construct it, I guess. 

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       W.r.t. lazy, I just copied the old behavior that probably saves an instance creation in case if don't get to broadcasting and fail early. It is probably alright to always construct it, I guess. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616339940



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -105,8 +102,8 @@
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Boolean readUsingBatch = null;
 
-  Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-      boolean caseSensitive, DataSourceOptions options) {
+  Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());

Review comment:
       Good point. I did not know about `fromSparkContext`. Let me all places in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616343354



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -105,8 +102,8 @@
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Boolean readUsingBatch = null;
 
-  Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-      boolean caseSensitive, DataSourceOptions options) {
+  Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());

Review comment:
       Updated all places, resolving.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604404772



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    return lazyTable().currentSnapshot();
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    return lazyTable().snapshot(snapshotId);
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    return lazyTable().snapshots();
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    return lazyTable().history();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    throw new UnsupportedOperationException(errorMsg("updateSchema"));

Review comment:
       I am ok either way. I think a slightly more descriptive error message is worth it in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600719639



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       Not much, unfortunately. Spark cannot use Kryo on `FileIO` as it contains a Hadoop conf. Even though Spark may serialize the Table object using Kryo, we still use Java serialization for `FileIO` and that's the dominant part.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603859471



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -168,9 +177,9 @@ private boolean isWapTable() {
 
   // the writer factory works for both batch and streaming
   private WriterFactory createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema, partitionedFanoutEnabled);
+    // broadcast the table metadata as the writer factory will be sent to executors
+    Broadcast<Table> staticTable = lazySparkContext().broadcast(SparkUtil.toStaticTable(table));

Review comment:
       Not that I know off 😢 
   We pass these variables as part of the configuration object for the Mappers/Reducers separately.
   We are not yet on this level in the optimizations, but soon... 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604421697



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       I am also concerned about this.
   
   I think we have the following alternative options:
   - Pass spec and schema as json strings and deserialize as needed.
   - Register a custom Kryo serializer for needed classes.
   
   I think we would want to avoid the second option if we can. Passing schema and spec as JSON could work. Thoughts, @rdblue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-814317053


   @rdblue @szehon-ho @RussellSpitzer @chenjunjiedada, this updates our Spark code to use `SerializableTable`. I'd appreciate your review whenever you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600758697



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       Kryo doesn't work with Spark's `SerializableConfiguration`? I thought that's why we use the Spark one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-814308911


   Let me actually close this one and open two separate ones instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608059070



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    return lazyTable().currentSnapshot();
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    return lazyTable().snapshot(snapshotId);
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    return lazyTable().snapshots();
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    return lazyTable().history();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    throw new UnsupportedOperationException(errorMsg("updateSchema"));

Review comment:
       That was addressed in a separate PR. No longer applies here.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -206,9 +215,7 @@ private void abort(WriterCommitMessage[] messages) {
             PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
             2.0 /* exponential */)
         .throwFailureWhenFinished()
-        .run(file -> {
-          io.value().deleteFile(file.path().toString());
-        });
+        .run(file -> table.io().deleteFile(file.path().toString()));

Review comment:
       Reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600332942



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       One of the main goals of the `BaseTable` serialization was to create a way to start scans on the deserialized Table. You can see here the example:
   https://github.com/apache/iceberg/blob/23735d1d99abf0207543ff5d9dcb63ae4fe4ec02/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L97-L104
   
   Maybe we can have Lazy loading of the metadata when needed and that way we can merge the two approach.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600337164



##########
File path: core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
##########
@@ -59,6 +60,11 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider
     this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString());
   }
 
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {

Review comment:
       This could be useful here as well:
   https://github.com/apache/iceberg/blob/05d39c5dd09d604336ab21c34e78553e5a6b13ef/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java#L76-L81




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600060586



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?

Review comment:
       How about `SerializedTable`? Then it would make sense to have an error message like "updateSchema is not supported after a table is serialized".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-814310867


   Well, there are pars that are interconnected and the change is not too big, I'd be easier to do this in a single PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603774317



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {

Review comment:
       The new tests actually require Spark to test Kryo. I also use `SparkUtil` in some places to construct serialized tables. The existing tests verify job planning which is not something I cover and are in the core module. It seems fine to have both but I can also try to refactor. Let me know what you think, @pvary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600970830



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       It works but Spark uses Java serialization for Hadoop configs.
   
   ```
   kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
   ```
   
   By reusing `SerializableConfiguration`, we don't have to register our own custom Kryo serializer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601059366



##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -34,7 +34,7 @@
  * using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
  * deserialization.
  */
-abstract class BaseMetadataTable implements Table, Serializable {
+public abstract class BaseMetadataTable implements Table, Serializable {

Review comment:
       Had to make it public so that I can do `instanceof` in Spark.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604421697



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       I am also concerned about this.
   
   I think we have the following alternative options:
   - Pass spec and schema as json strings and deserialize as needed.
   - Register a custom Kryo serializer for needed classes.
   
   I think we would want to avoid the second option as much as we can. Passing schema and spec as JSON could work. Thoughts, @rdblue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603772820



##########
File path: core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
##########
@@ -59,6 +60,11 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider
     this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString());
   }
 
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {

Review comment:
       This particular call also passes partition id. I think `OutputFileFactory` should have a builder. I added a TODO item and would propose to refactor this part once we switch to the builder approach. How does that sound, @pvary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601061225



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testTableJavaSerialization() throws IOException, ClassNotFoundException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+      out.writeObject(table);
+    }
+
+    try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+      Object obj = in.readObject();
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testSerializedTableKryoSerialization() throws IOException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+    Table serializedTable = SparkUtil.toSerializedTable(table);
+    checkTable(table, serializedTable);
+
+    try (Output out = new Output(new FileOutputStream(data))) {
+      kryo.writeClassAndObject(out, serializedTable);
+    }
+
+    try (Input in = new Input(new FileInputStream(data))) {
+      Object obj = kryo.readClassAndObject(in);
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testSerializedTableJavaSerialization() throws IOException, ClassNotFoundException {
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+
+    Table serializedTable = SparkUtil.toSerializedTable(table);
+    checkTable(table, serializedTable);
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+      out.writeObject(serializedTable);
+    }
+
+    try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+      Object obj = in.readObject();
+      Assert.assertTrue("Should be a Table", obj instanceof Table);
+      checkTable(table, (Table) obj);
+    }
+  }
+
+  @Test
+  public void testMetadataTableJavaSerialization() throws IOException, ClassNotFoundException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+        out.writeObject(metadataTable);
+      }
+
+      try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+        Object obj = in.readObject();
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  @Test
+  public void testSerializedMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      Table serializedMetadataTable = SparkUtil.toSerializedTable(metadataTable);
+      checkTable(metadataTable, serializedMetadataTable);
+
+      try (Output out = new Output(new FileOutputStream(data))) {
+        kryo.writeClassAndObject(out, serializedMetadataTable);
+      }
+
+      try (Input in = new Input(new FileInputStream(data))) {
+        Object obj = kryo.readClassAndObject(in);
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  @Test
+  public void testSerializedMetadataTableJavaSerialization() throws IOException, ClassNotFoundException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      File data = temp.newFile();
+      Assert.assertTrue(data.delete());
+
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      Table serializedMetadataTable = SparkUtil.toSerializedTable(metadataTable);
+      checkTable(metadataTable, serializedMetadataTable);
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+        out.writeObject(serializedMetadataTable);
+      }
+
+      try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+        Object obj = in.readObject();
+        Assert.assertTrue("Should be a Table", obj instanceof Table);
+        checkTable(metadataTable, (Table) obj);
+      }
+    }
+  }
+
+  private void checkTable(Table expected, Table actual) {
+    Assert.assertEquals("Name must match", expected.name(), actual.name());
+    Assert.assertEquals("Location must match", expected.location(), actual.location());
+    Assert.assertEquals("Props must match", expected.properties(), actual.properties());
+    Assert.assertEquals("Schema must match", expected.schema().asStruct(), actual.schema().asStruct());

Review comment:
       This will now fail, unfortunately. The way we compare struct fields is broken with Kryo serialization. We rely on the `PrimitiveHolder` proxy but Kryo ignores that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604323836



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -206,9 +215,7 @@ private void abort(WriterCommitMessage[] messages) {
             PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
             2.0 /* exponential */)
         .throwFailureWhenFinished()
-        .run(file -> {
-          io.value().deleteFile(file.path().toString());
-        });
+        .run(file -> table.io().deleteFile(file.path().toString()));

Review comment:
       Nit: non-functional change that may cause commit conflicts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608060300



##########
File path: spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {

Review comment:
       Addressed in a separate PR.

##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -34,7 +34,7 @@
  * using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
  * deserialization.
  */
-abstract class BaseMetadataTable implements Table, Serializable {
+public abstract class BaseMetadataTable implements Table, Serializable {

Review comment:
       No longer applies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-809907809


   I can probably split this into 2-3 PRs. Let me do that tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615074539



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -479,27 +488,17 @@ public void doCommit(long epochId, WriterCommitMessage[] messages) {
   }
 
   private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
-    private final PartitionSpec spec;
+    private final Broadcast<Table> tableBroadcast;
     private final FileFormat format;
-    private final LocationProvider locations;
-    private final Map<String, String> properties;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final long targetFileSize;
     private final Schema writeSchema;
     private final StructType dsSchema;
     private final boolean partitionedFanoutEnabled;
 
-    protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                            Map<String, String> properties, Broadcast<FileIO> io,
-                            Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+    protected WriterFactory(Broadcast<Table> tableBroadcast, FileFormat format, long targetFileSize,
                             Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
-      this.spec = spec;
+      this.tableBroadcast = tableBroadcast;

Review comment:
       This is a follow-up on https://github.com/apache/iceberg/commit/2843db8cbe2a2b6eba2209ec6f758d2530d5c94b where we redesigned the way we serialize tables. The new broadcasted size should up to 50% smaller according to the benchmarks I ran even though we broadcast the table. We changed the way we serialize the Hadoop conf in `FileIO`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-805379292


   Could you take a look too, @openinx? This does not touch the Flink part but we would want it to be consistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608059583



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {

Review comment:
       Done differently now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-821473610


   LGTM. Thanks @aokolnychyi. Abstraction is always challenging. Seeing with fresh eyes, are schema, partitionSpec, and partition Id part of table as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604403648



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);

Review comment:
       @rdblue, I think this is being discussed [here](https://github.com/apache/iceberg/pull/2258/files#r583317366). Strangely, we have a number of tests in Spark that succeed and work correctly with unmodifiable maps.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-823012452


   Thanks for reviewing, everyone! I've merged this to unblock other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600724710



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       How many times will we read the metadata file in this case? Just once, right, @pvary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608060800



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       Resolving as this is no longer relevant to this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604399562



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +41,14 @@
   private SparkUtil() {
   }
 
+  public static Table toSerializedTable(Table table) {

Review comment:
       How will we pass a custom FileIO required by serialization in Spark?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain edited a comment on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
flyrain edited a comment on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-821473610


   LGTM. Thanks @aokolnychyi. Abstraction is always challenging. Seeing with fresh eyes, are schema, partitionSpec, and partition Id part of table as well? Not sure if it is reasonable to combine them together though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r605102481



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
##########
@@ -60,6 +61,11 @@
   private StructType eqDeleteSparkType = null;
   private StructType posDeleteSparkType = null;
 
+  // TODO: expose a builder like SparkAppenderFactory.forTable()

Review comment:
       Yea sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-806378945


   This is still WIP but is ready for a more detailed review round. Added Kryo tests will fail but that's not really related to this PR. We have to fix `Schema` serialization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r614601995



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       I took one more look and I think we should be fine removing the laziness.
   
   @RussellSpitzer @rdblue, do you agree?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603866120



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +41,14 @@
   private SparkUtil() {
   }
 
+  public static Table toSerializedTable(Table table) {

Review comment:
       What about a solution like `HasTableOperations`?
   My feeling is that the Serialization method should be the responsibility of the table itself.
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601060077



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  // needed for serialization
+  public SerializedTable(String name, String location, String metadataFileLocation,
+                         Map<String, String> properties, Schema schema,
+                         int defaultSpecId, Map<Integer, PartitionSpec> specs,
+                         int defaultSortOrderId, Map<Integer, SortOrder> sortOrders,
+                         FileIO io, EncryptionManager encryption, LocationProvider locationProvider) {
+    this.name = name;
+    this.location = location;
+    this.metadataFileLocation = metadataFileLocation;
+    this.properties = properties;
+    this.schema = schema;
+    this.defaultSpecId = defaultSpecId;
+    this.specs = specs;
+    this.defaultSortOrderId = defaultSortOrderId;
+    this.sortOrders = sortOrders;
+    this.io = io;
+    this.encryption = encryption;
+    this.locationProvider = locationProvider;
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();

Review comment:
       Should support scans now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604321927



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +41,14 @@
   private SparkUtil() {
   }
 
+  public static Table toSerializedTable(Table table) {

Review comment:
       You mean like a `HasSerializableProxy` that exposes a `toSerializable()` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600338069



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       How much better is the kryo serialization in the case of the Table compared to the Java serialization? Any numbers at hand?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601063826



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?

Review comment:
       Changed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615086184



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       Actually, I remember why I did not do this. If we were to expose writing data with a partition spec that is not current, then we would need to pass the correct partition spec here instead of consuming the default one from the table object. I think it should be alright to keep it as is for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600046991



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<Integer, PartitionSpec> specs;
+  private final SortOrder sortOrder;
+  private final Map<Integer, SortOrder> sortOrders;
+  private FileIO io;
+  private EncryptionManager encryption;
+  private LocationProvider locationProvider;
+
+  public StaticTable(Table table) {
+    this.name = table.name();
+    this.location = table.location();
+    this.properties = copy(table.properties());

Review comment:
       We have to copy maps as Guava maps are not supported by Kryo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608059955



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -100,14 +100,16 @@
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
-  SparkWrite(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-             LogicalWriteInfo writeInfo, String applicationId, String wapId,
+  // lazy variables
+  private JavaSparkContext lazySparkContext = null;

Review comment:
       This PR includes both read & write. Resolving.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-821528746


   Oh, that one will be replaced with a builder in a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608853258



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -95,27 +79,29 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
 
-    RowDataReader dataReader = new RowDataReader(
-        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+    Table table = tableBroadcast.getValue();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604318831



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {

Review comment:
       Should we consider adding `metadataLocation` to `Table`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616224919



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -520,10 +500,9 @@ private InternalRowReaderFactory() {
     }
 
     @Override
-    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
-                                                    String nameMapping, FileIO io,
-                                                    EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Table table,

Review comment:
       Spacing on this and the method below seems odd ... I think expectedSchema would fit on the previous line? I don't really mind though but usually this sort of thing bugs you :) Fine as is if you don't want to just have a single dangling caseSensitive




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r616339228



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -520,10 +500,9 @@ private InternalRowReaderFactory() {
     }
 
     @Override
-    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
-                                                    String nameMapping, FileIO io,
-                                                    EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+    public InputPartitionReader<InternalRow> create(CombinedScanTask task, Table table,

Review comment:
       I am paranoid about such things :) It feels a bit unbalanced with only caseSensitive on a new line.
   
   <img width="867" alt="image" src="https://user-images.githubusercontent.com/6235869/115338554-e4ee9600-a157-11eb-8815-38e00842c83c.png">
   
   <img width="772" alt="image" src="https://user-images.githubusercontent.com/6235869/115338596-fafc5680-a157-11eb-94b5-c78180a4d6ad.png">
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600046317



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       I did not want another Table implementation but Kryo will ignore the Java serialization lifecycle. That's why the proxy pattern we currently have in `BaseTable` won't work. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601063287



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       I replaced the proxies. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603771672



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       I am resolving this thread as the current approach covers queries.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r601063442



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<Integer, PartitionSpec> specs;
+  private final SortOrder sortOrder;
+  private final Map<Integer, SortOrder> sortOrders;
+  private FileIO io;

Review comment:
       No longer applies. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615094235



##########
File path: core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
##########
@@ -59,6 +60,11 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider
     this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString());
   }
 
+  public OutputFileFactory(Table table, FileFormat format, int partitionId, long taskId) {

Review comment:
       I'll resolve this for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608852959



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -443,52 +448,37 @@ public String toString() {
 
   private static class ReadTask<T> implements Serializable, InputPartition<T> {
     private final CombinedScanTask task;
-    private final String tableSchemaString;
+    private final Broadcast<Table> tableBroadcast;
     private final String expectedSchemaString;
-    private final String nameMappingString;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
     private final boolean localityPreferred;
     private final ReaderFactory<T> readerFactory;
 
-    private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
     private transient String[] preferredLocations = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+    private ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
                      boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
       this.task = task;
-      this.tableSchemaString = tableSchemaString;
+      this.tableBroadcast = tableBroadcast;
       this.expectedSchemaString = expectedSchemaString;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
       this.readerFactory = readerFactory;
-      this.nameMappingString = nameMappingString;
     }
 
     @Override
     public InputPartitionReader<T> createPartitionReader() {
-      return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
-          encryptionManager.value(), caseSensitive);
+      Table table = tableBroadcast.getValue();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-805367992


   @pvary @rdblue @szehon-ho @RussellSpitzer, it would be great to hear your thoughts on this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600061935



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<Integer, PartitionSpec> specs;
+  private final SortOrder sortOrder;
+  private final Map<Integer, SortOrder> sortOrders;
+  private FileIO io;

Review comment:
       Makes sense for `FileIO`, but do we need to do this for `EncryptionManager` and `LocationProvider`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608851528



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       I think so but it will have to touch more than one query engine so I stopped here for now. Sounds like a good follow-up item.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600049781



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       It may not make any difference. According to my preliminary tests, the size of a serialized static table is around 75226 bytes where almost 72000 bytes are taken by FileIO. I need to test it with tables that have a lot of columns and a number of specs and sort orders but it may not be worth it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608059723



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +41,14 @@
   private SparkUtil() {
   }
 
+  public static Table toSerializedTable(Table table) {

Review comment:
       No longer applies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608852838



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -49,34 +46,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
 
-  private final Schema schema;
+  private final Broadcast<Table> tableBroadcast;
   private final PartitionSpec spec;
-  private final Map<String, String> properties;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
-  private final LocationProvider locations;
-  private final String nameMapping;
   private final boolean caseSensitive;
 
-  public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
-                         Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
-    this.schema = table.schema();
+  public RowDataRewriter(Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean caseSensitive) {
+    this.tableBroadcast = tableBroadcast;
     this.spec = spec;
-    this.locations = table.locationProvider();
-    this.properties = table.properties();
-    this.io = io;
-    this.encryptionManager = encryptionManager;
-
     this.caseSensitive = caseSensitive;
-    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
+    Table table = tableBroadcast.getValue();

Review comment:
       You are right, good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600760405



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       @aokolnychyi, it would load metadata each time a scan is built. So if you called `newScan` on executors it would result in a lot of calls. But we don't expect that to happen so I think it should be okay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608506448



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -49,34 +46,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
-
 public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
 
-  private final Schema schema;
+  private final Broadcast<Table> tableBroadcast;
   private final PartitionSpec spec;
-  private final Map<String, String> properties;
   private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
-  private final LocationProvider locations;
-  private final String nameMapping;
   private final boolean caseSensitive;
 
-  public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
-                         Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
-    this.schema = table.schema();
+  public RowDataRewriter(Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean caseSensitive) {
+    this.tableBroadcast = tableBroadcast;
     this.spec = spec;
-    this.locations = table.locationProvider();
-    this.properties = table.properties();
-    this.io = io;
-    this.encryptionManager = encryptionManager;
-
     this.caseSensitive = caseSensitive;
-    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
+    Table table = tableBroadcast.getValue();

Review comment:
       We should use value(), right?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -95,27 +79,29 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
 
-    RowDataReader dataReader = new RowDataReader(
-        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+    Table table = tableBroadcast.getValue();

Review comment:
       Same comment

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -513,17 +512,22 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider
 
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();
+
+      OutputFileFactory fileFactory = new OutputFileFactory(table, format, partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(table, writeSchema, dsSchema);
+
+      PartitionSpec spec = table.spec();
+      FileIO io = table.io();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);

Review comment:
       I guess in future we can simplify these writer constructors too with table?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -443,52 +448,37 @@ public String toString() {
 
   private static class ReadTask<T> implements Serializable, InputPartition<T> {
     private final CombinedScanTask task;
-    private final String tableSchemaString;
+    private final Broadcast<Table> tableBroadcast;
     private final String expectedSchemaString;
-    private final String nameMappingString;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
     private final boolean localityPreferred;
     private final ReaderFactory<T> readerFactory;
 
-    private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
     private transient String[] preferredLocations = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+    private ReadTask(CombinedScanTask task, Broadcast<Table> tableBroadcast, String expectedSchemaString,
                      boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
       this.task = task;
-      this.tableSchemaString = tableSchemaString;
+      this.tableBroadcast = tableBroadcast;
       this.expectedSchemaString = expectedSchemaString;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
       this.readerFactory = readerFactory;
-      this.nameMappingString = nameMappingString;
     }
 
     @Override
     public InputPartitionReader<T> createPartitionReader() {
-      return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
-          encryptionManager.value(), caseSensitive);
+      Table table = tableBroadcast.getValue();

Review comment:
       Same

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       Out of curiosity, why do we need a lazy one?
   
   And also what the java one provide?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -274,17 +271,21 @@ public String toString() {
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(
-          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
-      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec);
+      Table table = tableBroadcast.getValue();

Review comment:
       Same (value()?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603773021



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  // needed for serialization
+  public SerializedTable(String name, String location, String metadataFileLocation,
+                         Map<String, String> properties, Schema schema,
+                         int defaultSpecId, Map<Integer, PartitionSpec> specs,
+                         int defaultSortOrderId, Map<Integer, SortOrder> sortOrders,
+                         FileIO io, EncryptionManager encryption, LocationProvider locationProvider) {
+    this.name = name;
+    this.location = location;
+    this.metadataFileLocation = metadataFileLocation;
+    this.properties = properties;
+    this.schema = schema;
+    this.defaultSpecId = defaultSpecId;
+    this.specs = specs;
+    this.defaultSortOrderId = defaultSortOrderId;
+    this.sortOrders = sortOrders;
+    this.io = io;
+    this.encryption = encryption;
+    this.locationProvider = locationProvider;
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    throw new UnsupportedOperationException(errorMsg("currentSnapshot"));
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    throw new UnsupportedOperationException(errorMsg("snapshot"));
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    throw new UnsupportedOperationException(errorMsg("snapshots"));
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    throw new UnsupportedOperationException(errorMsg("history"));
+  }

Review comment:
       Went ahead and implemented it now. Good catch, @pvary. Resolving the thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603777787



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -100,14 +100,16 @@
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
-  SparkWrite(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-             LogicalWriteInfo writeInfo, String applicationId, String wapId,
+  // lazy variables
+  private JavaSparkContext lazySparkContext = null;

Review comment:
       I can do similar refactoring for the read side in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604399037



##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -34,7 +34,7 @@
  * using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
  * deserialization.
  */
-abstract class BaseMetadataTable implements Table, Serializable {
+public abstract class BaseMetadataTable implements Table, Serializable {

Review comment:
       How can we move `SparkUtil.toSerializedTable` to core if it depends on `SparkUtil.serializableFileIO`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604320934



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    return sortOrders.get(defaultSortOrderId);
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return sortOrders;
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    return lazyTable().currentSnapshot();
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    return lazyTable().snapshot(snapshotId);
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    return lazyTable().snapshots();
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    return lazyTable().history();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    throw new UnsupportedOperationException(errorMsg("updateSchema"));

Review comment:
       Should we fall back to letting the `StaticTableOperations` throw the exception for this? I could go either way. If we do it here then it is clear that the problem is that the table was serialized, not just that the ops is static.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600970830



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size

Review comment:
       It works but Spark uses Java serialization for Hadoop configs.
   
   ```
   kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
   ```
   
   By reusing `SerializableConfiguration` from Spark, we don't have to register our own custom Kryo serializer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600046476



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<Integer, PartitionSpec> specs;
+  private final SortOrder sortOrder;
+  private final Map<Integer, SortOrder> sortOrders;
+  private FileIO io;

Review comment:
       These are not final so that we can override them using setters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-821518248


   > schema, partitionSpec, and partition Id part of table as well
   
   We used to derive this information from the table object and pass individual things as the table wasn't serializable. Each time we needed one more entity, we had to modify a lot of places. Hopefully, this won't be needed after this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r615093033



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       I went ahead and removed the laziness. I think it should be safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r603784062



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
##########
@@ -60,6 +61,11 @@
   private StructType eqDeleteSparkType = null;
   private StructType posDeleteSparkType = null;
 
+  // TODO: expose a builder like SparkAppenderFactory.forTable()

Review comment:
       @szehon-ho, I think you had a builder in your PR. Would you be interested to submit separate PR for that once this one is in?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
##########
@@ -60,6 +61,11 @@
   private StructType eqDeleteSparkType = null;
   private StructType posDeleteSparkType = null;
 
+  // TODO: expose a builder like SparkAppenderFactory.forTable()

Review comment:
       @szehon-ho, I think you had a builder in your PR. Would you be interested to submit a separate PR for that once this one is in?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604326469



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       I think the implementation here is okay, but we should have thorough tests for this because we don't want to break equality. Probably best to do this in a separate PR with the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608060121



##########
File path: api/src/main/java/org/apache/iceberg/types/Type.java
##########
@@ -111,6 +112,23 @@ public PrimitiveType asPrimitiveType() {
     Object writeReplace() throws ObjectStreamException {
       return new PrimitiveHolder(toString());
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(typeId());
+    }
+
+    @Override
+    public boolean equals(Object other) {

Review comment:
       Addressed in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r608509240



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -123,6 +123,13 @@
     this.partitionedFanoutEnabled = options.getBoolean(SparkWriteOptions.FANOUT_ENABLED, tablePartitionedFanoutEnabled);
   }
 
+  protected JavaSparkContext lazySparkContext() {

Review comment:
       Out of curiosity, why do we need a lazy one?
   
   And also while I am asking, what does the java one provide?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600046317



##########
File path: core/src/main/java/org/apache/iceberg/StaticTable.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+// TODO: any ideas on naming?
+public class StaticTable implements Table, Serializable {

Review comment:
       I did not want another Table implementation but Kryo will ignore the Java serialization lifecycle. That's why the proxy pattern we currently have won't work. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r600053744



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -38,6 +39,13 @@
   private SparkUtil() {
   }
 
+  public static Table toStaticTable(Table table) {
+    // TODO: consider setting some fields as null to reduce the size
+    StaticTable staticTable = new StaticTable(table);
+    staticTable.setIO(serializableFileIO(table));

Review comment:
       The way we handle FileIO serializability is special to Spark. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#issuecomment-805494536


   cc @yyanyy as well. I think this may affect how we can do the refactoring of writer factories. New APIs can accept the Table object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2362: Spark: Pass Table object to executors

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2362:
URL: https://github.com/apache/iceberg/pull/2362#discussion_r604320352



##########
File path: core/src/main/java/org/apache/iceberg/SerializedTable.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class SerializedTable implements Table, Serializable {
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
+  private final int defaultSortOrderId;
+  private final Map<Integer, SortOrder> sortOrders;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  // lazy vars loaded on demand
+  private transient volatile Table lazyTable = null;
+
+  public SerializedTable(Table table) {
+    this(table, table.io());
+  }
+
+  public SerializedTable(Table table, FileIO io) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = copy(table.properties());
+    this.schema = table.schema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = copy(table.specs());
+    this.defaultSortOrderId = table.sortOrder().orderId();
+    this.sortOrders = copy(table.sortOrders());
+    this.io = io;
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+
+    } else if (table instanceof BaseMetadataTable) {
+      return ((BaseMetadataTable) table).metadataLocation();
+
+    } else {
+      throw new IllegalArgumentException("Cannot determine metadata file location for table " + table.name());
+    }
+  }
+
+  private <K, V> Map<K, V> copy(Map<K, V> map) {
+    Map<K, V> copy = Maps.newHashMap(map);
+    return Collections.unmodifiableMap(copy);

Review comment:
       Aren't there problems with `unmodifiableMap` and Kryo? We just added `SerializableMap` to handle that elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org