You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/08/17 20:18:55 UTC
[iceberg] branch master updated: MR: Use SerializableTable in
IcebergSplit (#2988)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8181600 MR: Use SerializableTable in IcebergSplit (#2988)
8181600 is described below
commit 818160003328af8c1f0b0e3ed94acef68bab6a13
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Aug 17 10:18:19 2021 -1000
MR: Use SerializableTable in IcebergSplit (#2988)
---
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 9 +++--
.../apache/iceberg/mr/mapreduce/IcebergSplit.java | 43 +++++++---------------
2 files changed, 20 insertions(+), 32 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index c526ede..3c64d59 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -135,13 +136,14 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
+ Table serializableTable = SerializableTable.copyOf(table);
tasksIterable.forEach(task -> {
if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE ||
model == InputFormatConfig.InMemoryDataModel.PIG)) {
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
- splits.add(new IcebergSplit(conf, task, table.io(), table.encryption()));
+ splits.add(new IcebergSplit(serializableTable, conf, task));
});
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
@@ -204,8 +206,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
- this.io = ((IcebergSplit) split).io();
- this.encryptionManager = ((IcebergSplit) split).encryptionManager();
+ Table table = ((IcebergSplit) split).table();
+ this.io = table.io();
+ this.encryptionManager = table.encryption();
this.tasks = task.files().iterator();
this.tableSchema = InputFormatConfig.tableSchema(conf);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 632224e..8bc332e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -26,9 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.util.SerializationUtil;
@@ -38,9 +37,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
public static final String[] ANYWHERE = new String[]{"*"};
+ private Table table;
private CombinedScanTask task;
- private FileIO io;
- private EncryptionManager encryptionManager;
private transient String[] locations;
private transient Configuration conf;
@@ -49,11 +47,10 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
public IcebergSplit() {
}
- IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
+ IcebergSplit(Table table, Configuration conf, CombinedScanTask task) {
+ this.table = table;
this.task = task;
this.conf = conf;
- this.io = io;
- this.encryptionManager = encryptionManager;
}
public CombinedScanTask task() {
@@ -86,39 +83,27 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
@Override
public void write(DataOutput out) throws IOException {
+ byte[] tableData = SerializationUtil.serializeToBytes(table);
+ out.writeInt(tableData.length);
+ out.write(tableData);
+
byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);
-
- byte[] ioData = SerializationUtil.serializeToBytes(io);
- out.writeInt(ioData.length);
- out.write(ioData);
-
- byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager);
- out.writeInt(encryptionManagerData.length);
- out.write(encryptionManagerData);
}
@Override
public void readFields(DataInput in) throws IOException {
+ byte[] tableData = new byte[in.readInt()];
+ in.readFully(tableData);
+ this.table = SerializationUtil.deserializeFromBytes(tableData);
+
byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);
-
- byte[] ioData = new byte[in.readInt()];
- in.readFully(ioData);
- this.io = SerializationUtil.deserializeFromBytes(ioData);
-
- byte[] encryptionManagerData = new byte[in.readInt()];
- in.readFully(encryptionManagerData);
- this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData);
- }
-
- public FileIO io() {
- return io;
}
- public EncryptionManager encryptionManager() {
- return encryptionManager;
+ public Table table() {
+ return table;
}
}