You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/05 21:46:18 UTC

[iceberg] branch master updated: Core: Add SerializableTable (#2403)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2843db8  Core: Add SerializableTable (#2403)
2843db8 is described below

commit 2843db8cbe2a2b6eba2209ec6f758d2530d5c94b
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 5 14:46:02 2021 -0700

    Core: Add SerializableTable (#2403)
---
 .../test/java/org/apache/iceberg/TestHelpers.java  |  18 +
 .../java/org/apache/iceberg/BaseMetadataTable.java |  33 +-
 .../main/java/org/apache/iceberg/BaseTable.java    |  26 +-
 .../java/org/apache/iceberg/BaseTransaction.java   |   7 +-
 .../java/org/apache/iceberg/SerializableTable.java | 378 +++++++++++++++++++++
 .../java/org/apache/iceberg/SortOrderParser.java   |  21 ++
 .../iceberg/hadoop/TestTableSerialization.java     |  58 +++-
 .../test/java/org/apache/iceberg/KryoHelpers.java  |  52 +++
 .../apache/iceberg/TestFileIOSerialization.java    | 113 ++++++
 .../org/apache/iceberg/TestTableSerialization.java | 102 ++++++
 10 files changed, 748 insertions(+), 60 deletions(-)

diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index aa0306e..fd4ea08 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -102,6 +102,24 @@ public class TestHelpers {
     );
   }
 
+  public static void assertSerializedMetadata(Table expected, Table actual) {
+    Assert.assertEquals("Name must match", expected.name(), actual.name());
+    Assert.assertEquals("Location must match", expected.location(), actual.location());
+    Assert.assertEquals("Props must match", expected.properties(), actual.properties());
+    Assert.assertEquals("Schema must match", expected.schema().asStruct(), actual.schema().asStruct());
+    Assert.assertEquals("Spec must match", expected.spec(), actual.spec());
+    Assert.assertEquals("Sort order must match", expected.sortOrder(), actual.sortOrder());
+  }
+
+  public static void assertSerializedAndLoadedMetadata(Table expected, Table actual) {
+    assertSerializedMetadata(expected, actual);
+    Assert.assertEquals("Specs must match", expected.specs(), actual.specs());
+    Assert.assertEquals("Sort orders must match", expected.sortOrders(), actual.sortOrders());
+    Assert.assertEquals("Current snapshot must match", expected.currentSnapshot(), actual.currentSnapshot());
+    Assert.assertEquals("Snapshots must match", expected.snapshots(), actual.snapshots());
+    Assert.assertEquals("History must match", expected.history(), actual.history());
+  }
+
   private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> {
     private final String message;
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 1800636..28a5a59 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -219,37 +219,6 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
   }
 
   final Object writeReplace() {
-    String metadataLocation = ops.current().metadataFileLocation();
-    return new TableProxy(io(), table().name(), name(), metadataLocation, metadataTableType(), locationProvider());
-  }
-
-  static class TableProxy implements Serializable {
-    private FileIO io;
-    private String baseTableName;
-    private String metadataTableName;
-    private String metadataLocation;
-    private MetadataTableType type;
-    private LocationProvider locationProvider;
-
-    TableProxy(FileIO io, String baseTableName, String metadataTableName, String metadataLocation,
-               MetadataTableType type, LocationProvider locationProvider) {
-      this.io = io;
-      this.baseTableName = baseTableName;
-      this.metadataTableName = metadataTableName;
-      this.metadataLocation = metadataLocation;
-      this.type = type;
-      this.locationProvider = locationProvider;
-    }
-
-    /**
-     * Returns a table with {@link StaticTableOperations} so after deserialization no Catalog related calls are
-     * needed for accessing the table snapshot data.
-     * @return The metadata Table object for reading the table data at the time of the serialization of the original
-     *         object
-     */
-    private Object readResolve()  {
-      TableOperations ops = new StaticTableOperations(metadataLocation, io, locationProvider);
-      return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, metadataTableName, type);
-    }
+    return SerializableTable.copyOf(this);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 51c056f..f7e7540 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -225,30 +225,6 @@ public class BaseTable implements Table, HasTableOperations, Serializable {
   }
 
   Object writeReplace() {
-    return new TableProxy(this);
-  }
-
-  private static class TableProxy implements Serializable {
-    private FileIO io;
-    private String name;
-    private String metadataLocation;
-    private LocationProvider locationProvider;
-
-    private TableProxy(BaseTable table) {
-      io = table.io();
-      name = table.name();
-      metadataLocation = table.operations().current().metadataFileLocation();
-      locationProvider = table.locationProvider();
-    }
-
-    /**
-     * Returns a BaseTable with {@link StaticTableOperations} so after deserialization no Catalog related calls are
-     * needed for accessing the table snapshot data.
-     * @return The BaseTable object for reading the table data at the time of the serialization of the original
-     *         BaseTable object
-     */
-    private Object readResolve()  {
-      return new BaseTable(new StaticTableOperations(metadataLocation, io, locationProvider), name);
-    }
+    return SerializableTable.copyOf(this);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 925f37f..36cf53b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -499,7 +500,7 @@ class BaseTransaction implements Transaction {
     }
   }
 
-  public class TransactionTable implements Table, HasTableOperations {
+  public class TransactionTable implements Table, HasTableOperations, Serializable {
 
     @Override
     public TableOperations operations() {
@@ -679,6 +680,10 @@ class BaseTransaction implements Transaction {
     public String toString() {
       return name();
     }
+
+    Object writeReplace() {
+      return SerializableTable.copyOf(this);
+    }
   }
 
   @VisibleForTesting
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
new file mode 100644
index 0000000..5b649f6
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
+
+/**
+ * A read-only serializable table that can be sent to other nodes in a cluster.
+ * <p>
+ * An instance of this class represents an immutable serializable copy of a table state and
+ * will not reflect any subsequent changed made to the original table.
+ * <p>
+ * While this class captures the metadata file location that can be used to load the complete
+ * table metadata, it directly persists the current schema, spec, sort order, table properties
+ * to avoid reading the metadata file from other nodes for frequently needed metadata.
+ * <p>
+ * The implementation assumes the passed instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} are serializable. If you are serializing the table using a custom
+ * serialization framework like Kryo, those instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} must be supported by that particular serialization framework.
+ * <p>
+ * <em>Note:</em> loading the complete metadata from a large number of nodes can overwhelm the storage.
+ */
+public class SerializableTable implements Table, Serializable {
+
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final String schemaAsJson;
+  private final String specAsJson;
+  private final String sortOrderAsJson;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+
+  private transient volatile Table lazyTable = null;
+  private transient volatile Schema lazySchema = null;
+  private transient volatile PartitionSpec lazySpec = null;
+  private transient volatile SortOrder lazySortOrder = null;
+
+  private SerializableTable(Table table) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = SerializableMap.copyOf(table.properties());
+    this.schemaAsJson = SchemaParser.toJson(table.schema());
+    this.specAsJson = PartitionSpecParser.toJson(table.spec());
+    this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+    this.io = fileIO(table);
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+  }
+
+  /**
+   * Creates a read-only serializable table that can be sent to other nodes in a cluster.
+   *
+   * @param table the original table to copy the state from
+   * @return a read-only serializable table reflecting the current state of the original table
+   */
+  public static Table copyOf(Table table) {
+    if (table instanceof BaseMetadataTable) {
+      return new SerializableMetadataTable((BaseMetadataTable) table);
+    } else {
+      return new SerializableTable(table);
+    }
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+    } else {
+      return null;
+    }
+  }
+
+  private FileIO fileIO(Table table) {
+    if (table.io() instanceof HadoopFileIO) {
+      HadoopFileIO hadoopFileIO = (HadoopFileIO) table.io();
+      SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopFileIO.getConf());
+      return new HadoopFileIO(serializedConf::get);
+    } else {
+      return table.io();
+    }
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          if (metadataFileLocation == null) {
+            throw new UnsupportedOperationException("Cannot load metadata: metadata file location is null");
+          }
+
+          TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    if (lazySchema == null) {
+      synchronized (this) {
+        if (lazySchema == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          this.lazySchema = SchemaParser.fromJson(schemaAsJson);
+        } else if (lazySchema == null) {
+          this.lazySchema = lazyTable.schema();
+        }
+      }
+    }
+
+    return lazySchema;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    if (lazySpec == null) {
+      synchronized (this) {
+        if (lazySpec == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          this.lazySpec = PartitionSpecParser.fromJson(schema(), specAsJson);
+        } else if (lazySpec == null) {
+          this.lazySpec = lazyTable.spec();
+        }
+      }
+    }
+
+    return lazySpec;
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return lazyTable().specs();
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    if (lazySortOrder == null) {
+      synchronized (this) {
+        if (lazySortOrder == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          this.lazySortOrder = SortOrderParser.fromJson(schema(), sortOrderAsJson);
+        } else if (lazySortOrder == null) {
+          this.lazySortOrder = lazyTable.sortOrder();
+        }
+      }
+    }
+
+    return lazySortOrder;
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return lazyTable().sortOrders();
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    return lazyTable().currentSnapshot();
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    return lazyTable().snapshot(snapshotId);
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    return lazyTable().snapshots();
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    return lazyTable().history();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    throw new UnsupportedOperationException(errorMsg("updateSchema"));
+  }
+
+  @Override
+  public UpdatePartitionSpec updateSpec() {
+    throw new UnsupportedOperationException(errorMsg("updateSpec"));
+  }
+
+  @Override
+  public UpdateProperties updateProperties() {
+    throw new UnsupportedOperationException(errorMsg("updateProperties"));
+  }
+
+  @Override
+  public ReplaceSortOrder replaceSortOrder() {
+    throw new UnsupportedOperationException(errorMsg("replaceSortOrder"));
+  }
+
+  @Override
+  public UpdateLocation updateLocation() {
+    throw new UnsupportedOperationException(errorMsg("updateLocation"));
+  }
+
+  @Override
+  public AppendFiles newAppend() {
+    throw new UnsupportedOperationException(errorMsg("newAppend"));
+  }
+
+  @Override
+  public RewriteFiles newRewrite() {
+    throw new UnsupportedOperationException(errorMsg("newRewrite"));
+  }
+
+  @Override
+  public RewriteManifests rewriteManifests() {
+    throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
+  }
+
+  @Override
+  public OverwriteFiles newOverwrite() {
+    throw new UnsupportedOperationException(errorMsg("newOverwrite"));
+  }
+
+  @Override
+  public RowDelta newRowDelta() {
+    throw new UnsupportedOperationException(errorMsg("newRowDelta"));
+  }
+
+  @Override
+  public ReplacePartitions newReplacePartitions() {
+    throw new UnsupportedOperationException(errorMsg("newReplacePartitions"));
+  }
+
+  @Override
+  public DeleteFiles newDelete() {
+    throw new UnsupportedOperationException(errorMsg("newDelete"));
+  }
+
+  @Override
+  public ExpireSnapshots expireSnapshots() {
+    throw new UnsupportedOperationException(errorMsg("expireSnapshots"));
+  }
+
+  @Override
+  public Rollback rollback() {
+    throw new UnsupportedOperationException(errorMsg("rollback"));
+  }
+
+  @Override
+  public ManageSnapshots manageSnapshots() {
+    throw new UnsupportedOperationException(errorMsg("manageSnapshots"));
+  }
+
+  @Override
+  public Transaction newTransaction() {
+    throw new UnsupportedOperationException(errorMsg("newTransaction"));
+  }
+
+  private String errorMsg(String operation) {
+    return String.format("Operation %s is not supported after the table is serialized", operation);
+  }
+
+  private static class SerializableMetadataTable extends SerializableTable {
+    private final MetadataTableType type;
+    private final String baseTableName;
+
+    SerializableMetadataTable(BaseMetadataTable metadataTable) {
+      super(metadataTable);
+      this.type = metadataTable.metadataTableType();
+      this.baseTableName = metadataTable.table().name();
+    }
+
+    @Override
+    protected Table newTable(TableOperations ops, String tableName) {
+      return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type);
+    }
+  }
+
+  // captures the current state of a Hadoop configuration in a serializable manner
+  private static class SerializableConfiguration implements Serializable {
+
+    private final Map<String, String> confAsMap;
+    private transient volatile Configuration conf = null;
+
+    SerializableConfiguration(Configuration conf) {
+      this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
+      conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
+    }
+
+    public Configuration get() {
+      if (conf == null) {
+        synchronized (this) {
+          if (conf == null) {
+            Configuration newConf = new Configuration(false);
+            confAsMap.forEach(newConf::set);
+            this.conf = newConf;
+          }
+        }
+      }
+
+      return conf;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 418486e..a351e3a 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.Locale;
@@ -50,6 +51,26 @@ public class SortOrderParser {
     generator.writeEndObject();
   }
 
+  public static String toJson(SortOrder sortOrder) {
+    return toJson(sortOrder, false);
+  }
+
+  public static String toJson(SortOrder sortOrder, boolean pretty) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+      if (pretty) {
+        generator.useDefaultPrettyPrinter();
+      }
+      toJson(sortOrder, generator);
+      generator.flush();
+      return writer.toString();
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
   private static String toJson(SortDirection direction) {
     return direction.toString().toLowerCase(Locale.ROOT);
   }
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
index 58bae17..1086ba0 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
@@ -31,16 +31,70 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestTableSerialization extends HadoopTableTestBase {
 
   @Test
-  public void testSerializeBaseTable() throws IOException {
+  public void testSerializableTable() throws IOException, ClassNotFoundException {
+    table.replaceSortOrder()
+        .asc("id")
+        .commit();
+
+    table.updateProperties()
+        .set("k1", "v1")
+        .set("k2", "v2")
+        .commit();
+
+    table.updateSchema()
+        .addColumn("new_col", Types.IntegerType.get())
+        .commit();
+
+    TestHelpers.assertSerializedAndLoadedMetadata(table, TestHelpers.roundTripSerialize(table));
+  }
+
+  @Test
+  public void testSerializableTxnTable() throws IOException, ClassNotFoundException {
+    table.replaceSortOrder()
+        .asc("id")
+        .commit();
+
+    table.updateProperties()
+        .set("k1", "v1")
+        .set("k2", "v2")
+        .commit();
+
+    table.updateSchema()
+        .addColumn("new_col", Types.IntegerType.get())
+        .commit();
+
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k3", "v3")
+        .commit();
+
+    // txn tables have metadata locations as null so we check only serialized metadata
+    TestHelpers.assertSerializedMetadata(txn.table(), TestHelpers.roundTripSerialize(txn.table()));
+  }
+
+  @Test
+  public void testSerializableMetadataTable() throws IOException, ClassNotFoundException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      Table metadataTable = getMetaDataTable(table, type);
+      TestHelpers.assertSerializedAndLoadedMetadata(metadataTable, TestHelpers.roundTripSerialize(metadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTablePlanning() throws IOException {
     table.newAppend()
         .appendFile(FILE_A)
         .commit();
@@ -65,7 +119,7 @@ public class TestTableSerialization extends HadoopTableTestBase {
   }
 
   @Test
-  public void testMetadataTables() throws IOException {
+  public void testSerializableMetadataTablesPlanning() throws IOException {
     table.newAppend()
         .appendFile(FILE_A)
         .commit();
diff --git a/spark/src/test/java/org/apache/iceberg/KryoHelpers.java b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java
new file mode 100644
index 0000000..ee0f0a7
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+
+public class KryoHelpers {
+
+  private KryoHelpers() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T roundTripSerialize(T obj) throws IOException {
+    Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+    try (Output out = new Output(new ObjectOutputStream(bytes))) {
+      kryo.writeClassAndObject(out, obj);
+    }
+
+    try (Input in = new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) {
+      return (T) kryo.readClassAndObject(in);
+    }
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
new file mode 100644
index 0000000..7c14485
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestFileIOSerialization {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  static {
+    CONF.set("k1", "v1");
+    CONF.set("k2", "v2");
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testHadoopFileIOKryoSerialization() throws IOException {
+    FileIO io = table.io();
+    Configuration expectedConf = ((HadoopFileIO) io).conf();
+
+    Table serializableTable = SerializableTable.copyOf(table);
+    FileIO deserializedIO = KryoHelpers.roundTripSerialize(serializableTable.io());
+    Configuration actualConf = ((HadoopFileIO) deserializedIO).conf();
+
+    Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf));
+    Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1"));
+    Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2"));
+  }
+
+  @Test
+  public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+    FileIO io = table.io();
+    Configuration expectedConf = ((HadoopFileIO) io).conf();
+
+    Table serializableTable = SerializableTable.copyOf(table);
+    FileIO deserializedIO = TestHelpers.roundTripSerialize(serializableTable.io());
+    Configuration actualConf = ((HadoopFileIO) deserializedIO).conf();
+
+    Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf));
+    Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1"));
+    Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2"));
+  }
+
+  private Map<String, String> toMap(Configuration conf) {
+    Map<String, String> map = Maps.newHashMapWithExpectedSize(conf.size());
+    conf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+    return map;
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
new file mode 100644
index 0000000..88e30c0
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    Table serializableTable = SerializableTable.copyOf(table);
+    TestHelpers.assertSerializedAndLoadedMetadata(table, KryoHelpers.roundTripSerialize(serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      Table serializableMetadataTable = SerializableTable.copyOf(metadataTable);
+
+      TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          KryoHelpers.roundTripSerialize(serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    Table serializableTxnTable = SerializableTable.copyOf(txnTable);
+
+    TestHelpers.assertSerializedMetadata(txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable));
+  }
+}