You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/05 21:46:18 UTC
[iceberg] branch master updated: Core: Add SerializableTable (#2403)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2843db8 Core: Add SerializableTable (#2403)
2843db8 is described below
commit 2843db8cbe2a2b6eba2209ec6f758d2530d5c94b
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 5 14:46:02 2021 -0700
Core: Add SerializableTable (#2403)
---
.../test/java/org/apache/iceberg/TestHelpers.java | 18 +
.../java/org/apache/iceberg/BaseMetadataTable.java | 33 +-
.../main/java/org/apache/iceberg/BaseTable.java | 26 +-
.../java/org/apache/iceberg/BaseTransaction.java | 7 +-
.../java/org/apache/iceberg/SerializableTable.java | 378 +++++++++++++++++++++
.../java/org/apache/iceberg/SortOrderParser.java | 21 ++
.../iceberg/hadoop/TestTableSerialization.java | 58 +++-
.../test/java/org/apache/iceberg/KryoHelpers.java | 52 +++
.../apache/iceberg/TestFileIOSerialization.java | 113 ++++++
.../org/apache/iceberg/TestTableSerialization.java | 102 ++++++
10 files changed, 748 insertions(+), 60 deletions(-)
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index aa0306e..fd4ea08 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -102,6 +102,24 @@ public class TestHelpers {
);
}
+ public static void assertSerializedMetadata(Table expected, Table actual) {
+ Assert.assertEquals("Name must match", expected.name(), actual.name());
+ Assert.assertEquals("Location must match", expected.location(), actual.location());
+ Assert.assertEquals("Props must match", expected.properties(), actual.properties());
+ Assert.assertEquals("Schema must match", expected.schema().asStruct(), actual.schema().asStruct());
+ Assert.assertEquals("Spec must match", expected.spec(), actual.spec());
+ Assert.assertEquals("Sort order must match", expected.sortOrder(), actual.sortOrder());
+ }
+
+ public static void assertSerializedAndLoadedMetadata(Table expected, Table actual) {
+ assertSerializedMetadata(expected, actual);
+ Assert.assertEquals("Specs must match", expected.specs(), actual.specs());
+ Assert.assertEquals("Sort orders must match", expected.sortOrders(), actual.sortOrders());
+ Assert.assertEquals("Current snapshot must match", expected.currentSnapshot(), actual.currentSnapshot());
+ Assert.assertEquals("Snapshots must match", expected.snapshots(), actual.snapshots());
+ Assert.assertEquals("History must match", expected.history(), actual.history());
+ }
+
private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> {
private final String message;
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 1800636..28a5a59 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -219,37 +219,6 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
}
final Object writeReplace() {
- String metadataLocation = ops.current().metadataFileLocation();
- return new TableProxy(io(), table().name(), name(), metadataLocation, metadataTableType(), locationProvider());
- }
-
- static class TableProxy implements Serializable {
- private FileIO io;
- private String baseTableName;
- private String metadataTableName;
- private String metadataLocation;
- private MetadataTableType type;
- private LocationProvider locationProvider;
-
- TableProxy(FileIO io, String baseTableName, String metadataTableName, String metadataLocation,
- MetadataTableType type, LocationProvider locationProvider) {
- this.io = io;
- this.baseTableName = baseTableName;
- this.metadataTableName = metadataTableName;
- this.metadataLocation = metadataLocation;
- this.type = type;
- this.locationProvider = locationProvider;
- }
-
- /**
- * Returns a table with {@link StaticTableOperations} so after deserialization no Catalog related calls are
- * needed for accessing the table snapshot data.
- * @return The metadata Table object for reading the table data at the time of the serialization of the original
- * object
- */
- private Object readResolve() {
- TableOperations ops = new StaticTableOperations(metadataLocation, io, locationProvider);
- return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, metadataTableName, type);
- }
+ return SerializableTable.copyOf(this);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 51c056f..f7e7540 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -225,30 +225,6 @@ public class BaseTable implements Table, HasTableOperations, Serializable {
}
Object writeReplace() {
- return new TableProxy(this);
- }
-
- private static class TableProxy implements Serializable {
- private FileIO io;
- private String name;
- private String metadataLocation;
- private LocationProvider locationProvider;
-
- private TableProxy(BaseTable table) {
- io = table.io();
- name = table.name();
- metadataLocation = table.operations().current().metadataFileLocation();
- locationProvider = table.locationProvider();
- }
-
- /**
- * Returns a BaseTable with {@link StaticTableOperations} so after deserialization no Catalog related calls are
- * needed for accessing the table snapshot data.
- * @return The BaseTable object for reading the table data at the time of the serialization of the original
- * BaseTable object
- */
- private Object readResolve() {
- return new BaseTable(new StaticTableOperations(metadataLocation, io, locationProvider), name);
- }
+ return SerializableTable.copyOf(this);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 925f37f..36cf53b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -499,7 +500,7 @@ class BaseTransaction implements Transaction {
}
}
- public class TransactionTable implements Table, HasTableOperations {
+ public class TransactionTable implements Table, HasTableOperations, Serializable {
@Override
public TableOperations operations() {
@@ -679,6 +680,10 @@ class BaseTransaction implements Transaction {
public String toString() {
return name();
}
+
+ Object writeReplace() {
+ return SerializableTable.copyOf(this);
+ }
}
@VisibleForTesting
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
new file mode 100644
index 0000000..5b649f6
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
+
+/**
+ * A read-only serializable table that can be sent to other nodes in a cluster.
+ * <p>
+ * An instance of this class represents an immutable serializable copy of a table state and
+ * will not reflect any subsequent changed made to the original table.
+ * <p>
+ * While this class captures the metadata file location that can be used to load the complete
+ * table metadata, it directly persists the current schema, spec, sort order, table properties
+ * to avoid reading the metadata file from other nodes for frequently needed metadata.
+ * <p>
+ * The implementation assumes the passed instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} are serializable. If you are serializing the table using a custom
+ * serialization framework like Kryo, those instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} must be supported by that particular serialization framework.
+ * <p>
+ * <em>Note:</em> loading the complete metadata from a large number of nodes can overwhelm the storage.
+ */
+public class SerializableTable implements Table, Serializable {
+
+ private final String name;
+ private final String location;
+ private final String metadataFileLocation;
+ private final Map<String, String> properties;
+ private final String schemaAsJson;
+ private final String specAsJson;
+ private final String sortOrderAsJson;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final LocationProvider locationProvider;
+
+ private transient volatile Table lazyTable = null;
+ private transient volatile Schema lazySchema = null;
+ private transient volatile PartitionSpec lazySpec = null;
+ private transient volatile SortOrder lazySortOrder = null;
+
+ private SerializableTable(Table table) {
+ this.name = table.name();
+ this.location = table.location();
+ this.metadataFileLocation = metadataFileLocation(table);
+ this.properties = SerializableMap.copyOf(table.properties());
+ this.schemaAsJson = SchemaParser.toJson(table.schema());
+ this.specAsJson = PartitionSpecParser.toJson(table.spec());
+ this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+ this.io = fileIO(table);
+ this.encryption = table.encryption();
+ this.locationProvider = table.locationProvider();
+ }
+
+ /**
+ * Creates a read-only serializable table that can be sent to other nodes in a cluster.
+ *
+ * @param table the original table to copy the state from
+ * @return a read-only serializable table reflecting the current state of the original table
+ */
+ public static Table copyOf(Table table) {
+ if (table instanceof BaseMetadataTable) {
+ return new SerializableMetadataTable((BaseMetadataTable) table);
+ } else {
+ return new SerializableTable(table);
+ }
+ }
+
+ private String metadataFileLocation(Table table) {
+ if (table instanceof HasTableOperations) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ return ops.current().metadataFileLocation();
+ } else {
+ return null;
+ }
+ }
+
+ private FileIO fileIO(Table table) {
+ if (table.io() instanceof HadoopFileIO) {
+ HadoopFileIO hadoopFileIO = (HadoopFileIO) table.io();
+ SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopFileIO.getConf());
+ return new HadoopFileIO(serializedConf::get);
+ } else {
+ return table.io();
+ }
+ }
+
+ private Table lazyTable() {
+ if (lazyTable == null) {
+ synchronized (this) {
+ if (lazyTable == null) {
+ if (metadataFileLocation == null) {
+ throw new UnsupportedOperationException("Cannot load metadata: metadata file location is null");
+ }
+
+ TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+ this.lazyTable = newTable(ops, name);
+ }
+ }
+ }
+
+ return lazyTable;
+ }
+
+ protected Table newTable(TableOperations ops, String tableName) {
+ return new BaseTable(ops, tableName);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
+ @Override
+ public Schema schema() {
+ if (lazySchema == null) {
+ synchronized (this) {
+ if (lazySchema == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ this.lazySchema = SchemaParser.fromJson(schemaAsJson);
+ } else if (lazySchema == null) {
+ this.lazySchema = lazyTable.schema();
+ }
+ }
+ }
+
+ return lazySchema;
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ if (lazySpec == null) {
+ synchronized (this) {
+ if (lazySpec == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ this.lazySpec = PartitionSpecParser.fromJson(schema(), specAsJson);
+ } else if (lazySpec == null) {
+ this.lazySpec = lazyTable.spec();
+ }
+ }
+ }
+
+ return lazySpec;
+ }
+
+ @Override
+ public Map<Integer, PartitionSpec> specs() {
+ return lazyTable().specs();
+ }
+
+ @Override
+ public SortOrder sortOrder() {
+ if (lazySortOrder == null) {
+ synchronized (this) {
+ if (lazySortOrder == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ this.lazySortOrder = SortOrderParser.fromJson(schema(), sortOrderAsJson);
+ } else if (lazySortOrder == null) {
+ this.lazySortOrder = lazyTable.sortOrder();
+ }
+ }
+ }
+
+ return lazySortOrder;
+ }
+
+ @Override
+ public Map<Integer, SortOrder> sortOrders() {
+ return lazyTable().sortOrders();
+ }
+
+ @Override
+ public FileIO io() {
+ return io;
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ return encryption;
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return locationProvider;
+ }
+
+ @Override
+ public void refresh() {
+ throw new UnsupportedOperationException(errorMsg("refresh"));
+ }
+
+ @Override
+ public TableScan newScan() {
+ return lazyTable().newScan();
+ }
+
+ @Override
+ public Snapshot currentSnapshot() {
+ return lazyTable().currentSnapshot();
+ }
+
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return lazyTable().snapshot(snapshotId);
+ }
+
+ @Override
+ public Iterable<Snapshot> snapshots() {
+ return lazyTable().snapshots();
+ }
+
+ @Override
+ public List<HistoryEntry> history() {
+ return lazyTable().history();
+ }
+
+ @Override
+ public UpdateSchema updateSchema() {
+ throw new UnsupportedOperationException(errorMsg("updateSchema"));
+ }
+
+ @Override
+ public UpdatePartitionSpec updateSpec() {
+ throw new UnsupportedOperationException(errorMsg("updateSpec"));
+ }
+
+ @Override
+ public UpdateProperties updateProperties() {
+ throw new UnsupportedOperationException(errorMsg("updateProperties"));
+ }
+
+ @Override
+ public ReplaceSortOrder replaceSortOrder() {
+ throw new UnsupportedOperationException(errorMsg("replaceSortOrder"));
+ }
+
+ @Override
+ public UpdateLocation updateLocation() {
+ throw new UnsupportedOperationException(errorMsg("updateLocation"));
+ }
+
+ @Override
+ public AppendFiles newAppend() {
+ throw new UnsupportedOperationException(errorMsg("newAppend"));
+ }
+
+ @Override
+ public RewriteFiles newRewrite() {
+ throw new UnsupportedOperationException(errorMsg("newRewrite"));
+ }
+
+ @Override
+ public RewriteManifests rewriteManifests() {
+ throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
+ }
+
+ @Override
+ public OverwriteFiles newOverwrite() {
+ throw new UnsupportedOperationException(errorMsg("newOverwrite"));
+ }
+
+ @Override
+ public RowDelta newRowDelta() {
+ throw new UnsupportedOperationException(errorMsg("newRowDelta"));
+ }
+
+ @Override
+ public ReplacePartitions newReplacePartitions() {
+ throw new UnsupportedOperationException(errorMsg("newReplacePartitions"));
+ }
+
+ @Override
+ public DeleteFiles newDelete() {
+ throw new UnsupportedOperationException(errorMsg("newDelete"));
+ }
+
+ @Override
+ public ExpireSnapshots expireSnapshots() {
+ throw new UnsupportedOperationException(errorMsg("expireSnapshots"));
+ }
+
+ @Override
+ public Rollback rollback() {
+ throw new UnsupportedOperationException(errorMsg("rollback"));
+ }
+
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ throw new UnsupportedOperationException(errorMsg("manageSnapshots"));
+ }
+
+ @Override
+ public Transaction newTransaction() {
+ throw new UnsupportedOperationException(errorMsg("newTransaction"));
+ }
+
+ private String errorMsg(String operation) {
+ return String.format("Operation %s is not supported after the table is serialized", operation);
+ }
+
+ private static class SerializableMetadataTable extends SerializableTable {
+ private final MetadataTableType type;
+ private final String baseTableName;
+
+ SerializableMetadataTable(BaseMetadataTable metadataTable) {
+ super(metadataTable);
+ this.type = metadataTable.metadataTableType();
+ this.baseTableName = metadataTable.table().name();
+ }
+
+ @Override
+ protected Table newTable(TableOperations ops, String tableName) {
+ return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type);
+ }
+ }
+
+ // captures the current state of a Hadoop configuration in a serializable manner
+ private static class SerializableConfiguration implements Serializable {
+
+ private final Map<String, String> confAsMap;
+ private transient volatile Configuration conf = null;
+
+ SerializableConfiguration(Configuration conf) {
+ this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
+ conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
+ }
+
+ public Configuration get() {
+ if (conf == null) {
+ synchronized (this) {
+ if (conf == null) {
+ Configuration newConf = new Configuration(false);
+ confAsMap.forEach(newConf::set);
+ this.conf = newConf;
+ }
+ }
+ }
+
+ return conf;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 418486e..a351e3a 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
+import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Locale;
@@ -50,6 +51,26 @@ public class SortOrderParser {
generator.writeEndObject();
}
+ public static String toJson(SortOrder sortOrder) {
+ return toJson(sortOrder, false);
+ }
+
+ public static String toJson(SortOrder sortOrder, boolean pretty) {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ if (pretty) {
+ generator.useDefaultPrettyPrinter();
+ }
+ toJson(sortOrder, generator);
+ generator.flush();
+ return writer.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private static String toJson(SortDirection direction) {
return direction.toString().toLowerCase(Locale.ROOT);
}
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
index 58bae17..1086ba0 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
@@ -31,16 +31,70 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
public class TestTableSerialization extends HadoopTableTestBase {
@Test
- public void testSerializeBaseTable() throws IOException {
+ public void testSerializableTable() throws IOException, ClassNotFoundException {
+ table.replaceSortOrder()
+ .asc("id")
+ .commit();
+
+ table.updateProperties()
+ .set("k1", "v1")
+ .set("k2", "v2")
+ .commit();
+
+ table.updateSchema()
+ .addColumn("new_col", Types.IntegerType.get())
+ .commit();
+
+ TestHelpers.assertSerializedAndLoadedMetadata(table, TestHelpers.roundTripSerialize(table));
+ }
+
+ @Test
+ public void testSerializableTxnTable() throws IOException, ClassNotFoundException {
+ table.replaceSortOrder()
+ .asc("id")
+ .commit();
+
+ table.updateProperties()
+ .set("k1", "v1")
+ .set("k2", "v2")
+ .commit();
+
+ table.updateSchema()
+ .addColumn("new_col", Types.IntegerType.get())
+ .commit();
+
+ Transaction txn = table.newTransaction();
+
+ txn.updateProperties()
+ .set("k3", "v3")
+ .commit();
+
+ // txn tables have metadata locations as null so we check only serialized metadata
+ TestHelpers.assertSerializedMetadata(txn.table(), TestHelpers.roundTripSerialize(txn.table()));
+ }
+
+ @Test
+ public void testSerializableMetadataTable() throws IOException, ClassNotFoundException {
+ for (MetadataTableType type : MetadataTableType.values()) {
+ Table metadataTable = getMetaDataTable(table, type);
+ TestHelpers.assertSerializedAndLoadedMetadata(metadataTable, TestHelpers.roundTripSerialize(metadataTable));
+ }
+ }
+
+ @Test
+ public void testSerializableTablePlanning() throws IOException {
table.newAppend()
.appendFile(FILE_A)
.commit();
@@ -65,7 +119,7 @@ public class TestTableSerialization extends HadoopTableTestBase {
}
@Test
- public void testMetadataTables() throws IOException {
+ public void testSerializableMetadataTablesPlanning() throws IOException {
table.newAppend()
.appendFile(FILE_A)
.commit();
diff --git a/spark/src/test/java/org/apache/iceberg/KryoHelpers.java b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java
new file mode 100644
index 0000000..ee0f0a7
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+
+public class KryoHelpers {
+
+ private KryoHelpers() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T roundTripSerialize(T obj) throws IOException {
+ Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+ try (Output out = new Output(new ObjectOutputStream(bytes))) {
+ kryo.writeClassAndObject(out, obj);
+ }
+
+ try (Input in = new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) {
+ return (T) kryo.readClassAndObject(in);
+ }
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
new file mode 100644
index 0000000..7c14485
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestFileIOSerialization {
+
+ private static final Configuration CONF = new Configuration();
+ private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()),
+ required(3, "date", Types.StringType.get()),
+ optional(4, "double", Types.DoubleType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec
+ .builderFor(SCHEMA)
+ .identity("date")
+ .build();
+
+ private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+ .asc("id")
+ .build();
+
+ static {
+ CONF.set("k1", "v1");
+ CONF.set("k2", "v2");
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private Table table;
+
+ @Before
+ public void initTable() throws IOException {
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+ File tableLocation = temp.newFolder();
+ Assert.assertTrue(tableLocation.delete());
+
+ this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+ }
+
+ @Test
+ public void testHadoopFileIOKryoSerialization() throws IOException {
+ FileIO io = table.io();
+ Configuration expectedConf = ((HadoopFileIO) io).conf();
+
+ Table serializableTable = SerializableTable.copyOf(table);
+ FileIO deserializedIO = KryoHelpers.roundTripSerialize(serializableTable.io());
+ Configuration actualConf = ((HadoopFileIO) deserializedIO).conf();
+
+ Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf));
+ Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1"));
+ Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2"));
+ }
+
+ @Test
+ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+ FileIO io = table.io();
+ Configuration expectedConf = ((HadoopFileIO) io).conf();
+
+ Table serializableTable = SerializableTable.copyOf(table);
+ FileIO deserializedIO = TestHelpers.roundTripSerialize(serializableTable.io());
+ Configuration actualConf = ((HadoopFileIO) deserializedIO).conf();
+
+ Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf));
+ Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1"));
+ Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2"));
+ }
+
+ private Map<String, String> toMap(Configuration conf) {
+ Map<String, String> map = Maps.newHashMapWithExpectedSize(conf.size());
+ conf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+ return map;
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
new file mode 100644
index 0000000..88e30c0
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+
+ private static final HadoopTables TABLES = new HadoopTables();
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()),
+ required(3, "date", Types.StringType.get()),
+ optional(4, "double", Types.DoubleType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec
+ .builderFor(SCHEMA)
+ .identity("date")
+ .build();
+
+ private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+ .asc("id")
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private Table table;
+
+ @Before
+ public void initTable() throws IOException {
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+ File tableLocation = temp.newFolder();
+ Assert.assertTrue(tableLocation.delete());
+
+ this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+ }
+
+ @Test
+ public void testSerializableTableKryoSerialization() throws IOException {
+ Table serializableTable = SerializableTable.copyOf(table);
+ TestHelpers.assertSerializedAndLoadedMetadata(table, KryoHelpers.roundTripSerialize(serializableTable));
+ }
+
+ @Test
+ public void testSerializableMetadataTableKryoSerialization() throws IOException {
+ for (MetadataTableType type : MetadataTableType.values()) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+ Table serializableMetadataTable = SerializableTable.copyOf(metadataTable);
+
+ TestHelpers.assertSerializedAndLoadedMetadata(
+ metadataTable,
+ KryoHelpers.roundTripSerialize(serializableMetadataTable));
+ }
+ }
+
+ @Test
+ public void testSerializableTransactionTableKryoSerialization() throws IOException {
+ Transaction txn = table.newTransaction();
+
+ txn.updateProperties()
+ .set("k1", "v1")
+ .commit();
+
+ Table txnTable = txn.table();
+ Table serializableTxnTable = SerializableTable.copyOf(txnTable);
+
+ TestHelpers.assertSerializedMetadata(txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable));
+ }
+}