You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/08/17 20:18:55 UTC

[iceberg] branch master updated: MR: Use SerializableTable in IcebergSplit (#2988)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8181600  MR: Use SerializableTable in IcebergSplit (#2988)
8181600 is described below

commit 818160003328af8c1f0b0e3ed94acef68bab6a13
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Aug 17 10:18:19 2021 -1000

    MR: Use SerializableTable in IcebergSplit (#2988)
---
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  9 +++--
 .../apache/iceberg/mr/mapreduce/IcebergSplit.java  | 43 +++++++---------------
 2 files changed, 20 insertions(+), 32 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index c526ede..3c64d59 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -135,13 +136,14 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
         InputFormatConfig.InMemoryDataModel.GENERIC);
     try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
+      Table serializableTable = SerializableTable.copyOf(table);
       tasksIterable.forEach(task -> {
         if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE ||
             model == InputFormatConfig.InMemoryDataModel.PIG)) {
           // TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
           checkResiduals(task);
         }
-        splits.add(new IcebergSplit(conf, task, table.io(), table.encryption()));
+        splits.add(new IcebergSplit(serializableTable, conf, task));
       });
     } catch (IOException e) {
       throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
@@ -204,8 +206,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
       CombinedScanTask task = ((IcebergSplit) split).task();
       this.context = newContext;
-      this.io = ((IcebergSplit) split).io();
-      this.encryptionManager = ((IcebergSplit) split).encryptionManager();
+      Table table = ((IcebergSplit) split).table();
+      this.io = table.io();
+      this.encryptionManager = table.encryption();
       this.tasks = task.files().iterator();
       this.tableSchema = InputFormatConfig.tableSchema(conf);
       this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 632224e..8bc332e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -26,9 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.util.SerializationUtil;
 
@@ -38,9 +37,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
 
   public static final String[] ANYWHERE = new String[]{"*"};
 
+  private Table table;
   private CombinedScanTask task;
-  private FileIO io;
-  private EncryptionManager encryptionManager;
 
   private transient String[] locations;
   private transient Configuration conf;
@@ -49,11 +47,10 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
   public IcebergSplit() {
   }
 
-  IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
+  IcebergSplit(Table table, Configuration conf, CombinedScanTask task) {
+    this.table = table;
     this.task = task;
     this.conf = conf;
-    this.io = io;
-    this.encryptionManager = encryptionManager;
   }
 
   public CombinedScanTask task() {
@@ -86,39 +83,27 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
 
   @Override
   public void write(DataOutput out) throws IOException {
+    byte[] tableData = SerializationUtil.serializeToBytes(table);
+    out.writeInt(tableData.length);
+    out.write(tableData);
+
     byte[] data = SerializationUtil.serializeToBytes(this.task);
     out.writeInt(data.length);
     out.write(data);
-
-    byte[] ioData = SerializationUtil.serializeToBytes(io);
-    out.writeInt(ioData.length);
-    out.write(ioData);
-
-    byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager);
-    out.writeInt(encryptionManagerData.length);
-    out.write(encryptionManagerData);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    byte[] tableData = new byte[in.readInt()];
+    in.readFully(tableData);
+    this.table = SerializationUtil.deserializeFromBytes(tableData);
+
     byte[] data = new byte[in.readInt()];
     in.readFully(data);
     this.task = SerializationUtil.deserializeFromBytes(data);
-
-    byte[] ioData = new byte[in.readInt()];
-    in.readFully(ioData);
-    this.io = SerializationUtil.deserializeFromBytes(ioData);
-
-    byte[] encryptionManagerData = new byte[in.readInt()];
-    in.readFully(encryptionManagerData);
-    this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData);
-  }
-
-  public FileIO io() {
-    return io;
   }
 
-  public EncryptionManager encryptionManager() {
-    return encryptionManager;
+  public Table table() {
+    return table;
   }
 }