You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/12/06 19:11:07 UTC

[iceberg] branch master updated: Core: Add table metadata builder (#3664)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d3b1f7  Core: Add table metadata builder (#3664)
6d3b1f7 is described below

commit 6d3b1f7996d4ceee5d5e28df7f378f6fe377950e
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon Dec 6 11:10:51 2021 -0800

    Core: Add table metadata builder (#3664)
---
 .../java/org/apache/iceberg/PartitionSpec.java     |    6 +-
 .../main/java/org/apache/iceberg/SortOrder.java    |   15 +-
 .../java/org/apache/iceberg/BaseTransaction.java   |   13 +-
 .../java/org/apache/iceberg/MetadataUpdate.java    |  203 ++++
 .../java/org/apache/iceberg/TableMetadata.java     | 1002 ++++++++++++--------
 .../org/apache/iceberg/TableMetadataParser.java    |    2 +-
 .../java/org/apache/iceberg/TestTableMetadata.java |   30 +-
 .../test/java/org/apache/iceberg/TestTables.java   |    5 +-
 .../iceberg/nessie/NessieTableOperations.java      |   15 +-
 9 files changed, 860 insertions(+), 431 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 51bc851..6ad90fd 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -499,10 +499,14 @@ public class PartitionSpec implements Serializable {
     }
 
     public PartitionSpec build() {
-      PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get());
+      PartitionSpec spec = buildUnchecked();
       checkCompatibility(spec, schema);
       return spec;
     }
+
+    PartitionSpec buildUnchecked() {
+      return new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get());
+    }
   }
 
   static void checkCompatibility(PartitionSpec spec, Schema schema) {
diff --git a/api/src/main/java/org/apache/iceberg/SortOrder.java b/api/src/main/java/org/apache/iceberg/SortOrder.java
index a217e39..595af6d 100644
--- a/api/src/main/java/org/apache/iceberg/SortOrder.java
+++ b/api/src/main/java/org/apache/iceberg/SortOrder.java
@@ -263,7 +263,18 @@ public class SortOrder implements Serializable {
       return this;
     }
 
+    Builder addSortField(Transform<?, ?> transform, int sourceId, SortDirection direction, NullOrder nullOrder) {
+      fields.add(new SortField(transform, sourceId, direction, nullOrder));
+      return this;
+    }
+
     public SortOrder build() {
+      SortOrder sortOrder = buildUnchecked();
+      checkCompatibility(sortOrder, schema);
+      return sortOrder;
+    }
+
+    SortOrder buildUnchecked() {
       if (fields.isEmpty()) {
         if (orderId != null && orderId != 0) {
           throw new IllegalArgumentException("Unsorted order ID must be 0");
@@ -277,9 +288,7 @@ public class SortOrder implements Serializable {
 
       // default ID to 1 as 0 is reserved for unsorted order
       int actualOrderId = orderId != null ? orderId : 1;
-      SortOrder sortOrder = new SortOrder(schema, actualOrderId, fields);
-      checkCompatibility(sortOrder, schema);
-      return sortOrder;
+      return new SortOrder(schema, actualOrderId, fields);
     }
 
     private Transform<?, ?> toTransform(BoundTerm<?> term) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 3d0b31e..7e779bc 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -240,13 +240,10 @@ class BaseTransaction implements Transaction {
   }
 
   private void commitCreateTransaction() {
-    // fix up the snapshot log, which should not contain intermediate snapshots
-    TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
     // this operation creates the table. if the commit fails, this cannot retry because another
     // process has created the same table.
     try {
-      ops.commit(null, createMetadata);
+      ops.commit(null, current);
 
     } catch (RuntimeException e) {
       // the commit failed and no files were committed. clean up each update.
@@ -271,9 +268,7 @@ class BaseTransaction implements Transaction {
   }
 
   private void commitReplaceTransaction(boolean orCreate) {
-    // fix up the snapshot log, which should not contain intermediate snapshots
-    TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-    Map<String, String> props = base != null ? base.properties() : replaceMetadata.properties();
+    Map<String, String> props = base != null ? base.properties() : current.properties();
 
     try {
       Tasks.foreach(ops)
@@ -300,7 +295,7 @@ class BaseTransaction implements Transaction {
               this.base = underlyingOps.current(); // just refreshed
             }
 
-            underlyingOps.commit(base, replaceMetadata);
+            underlyingOps.commit(base, current);
           });
 
     } catch (RuntimeException e) {
@@ -358,7 +353,7 @@ class BaseTransaction implements Transaction {
             }
 
             // fix up the snapshot log, which should not contain intermediate snapshots
-            underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+            underlyingOps.commit(base, current);
           });
 
     } catch (RuntimeException e) {
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
new file mode 100644
index 0000000..25d015a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents a change to table metadata.
+ */
+public interface MetadataUpdate extends Serializable {
+  class AssignUUID implements MetadataUpdate {
+    private final String uuid;
+
+    public AssignUUID(String uuid) {
+      this.uuid = uuid;
+    }
+
+    public String uuid() {
+      return uuid;
+    }
+  }
+
+  class UpgradeFormatVersion implements MetadataUpdate {
+    private final int formatVersion;
+
+    public UpgradeFormatVersion(int formatVersion) {
+      this.formatVersion = formatVersion;
+    }
+
+    public int formatVersion() {
+      return formatVersion;
+    }
+  }
+
+  class AddSchema implements MetadataUpdate {
+    private final Schema schema;
+    private final int lastColumnId;
+
+    public AddSchema(Schema schema, int lastColumnId) {
+      this.schema = schema;
+      this.lastColumnId = lastColumnId;
+    }
+
+    public Schema schema() {
+      return schema;
+    }
+
+    public int lastColumnId() {
+      return lastColumnId;
+    }
+  }
+
+  class SetCurrentSchema implements MetadataUpdate {
+    private final int schemaId;
+
+    public SetCurrentSchema(int schemaId) {
+      this.schemaId = schemaId;
+    }
+
+    public int schemaId() {
+      return schemaId;
+    }
+  }
+
+  class AddPartitionSpec implements MetadataUpdate {
+    private final PartitionSpec spec;
+
+    public AddPartitionSpec(PartitionSpec spec) {
+      this.spec = spec;
+    }
+
+    public PartitionSpec spec() {
+      return spec;
+    }
+  }
+
+  class SetDefaultPartitionSpec implements MetadataUpdate {
+    private final int specId;
+
+    public SetDefaultPartitionSpec(int schemaId) {
+      this.specId = schemaId;
+    }
+
+    public int specId() {
+      return specId;
+    }
+  }
+
+  class AddSortOrder implements MetadataUpdate {
+    private final SortOrder sortOrder;
+
+    public AddSortOrder(SortOrder sortOrder) {
+      this.sortOrder = sortOrder;
+    }
+
+    public SortOrder spec() {
+      return sortOrder;
+    }
+  }
+
+  class SetDefaultSortOrder implements MetadataUpdate {
+    private final int sortOrderId;
+
+    public SetDefaultSortOrder(int sortOrderId) {
+      this.sortOrderId = sortOrderId;
+    }
+
+    public int sortOrderId() {
+      return sortOrderId;
+    }
+  }
+
+  class AddSnapshot implements MetadataUpdate {
+    private final Snapshot snapshot;
+
+    public AddSnapshot(Snapshot snapshot) {
+      this.snapshot = snapshot;
+    }
+
+    public Snapshot snapshot() {
+      return snapshot;
+    }
+  }
+
+  class RemoveSnapshot implements MetadataUpdate {
+    private final long snapshotId;
+
+    public RemoveSnapshot(long snapshotId) {
+      this.snapshotId = snapshotId;
+    }
+
+    public long snapshotId() {
+      return snapshotId;
+    }
+  }
+
+  class SetCurrentSnapshot implements MetadataUpdate {
+    private final Long snapshotId;
+
+    public SetCurrentSnapshot(Long snapshotId) {
+      this.snapshotId = snapshotId;
+    }
+
+    public Long snapshotId() {
+      return snapshotId;
+    }
+  }
+
+  class SetProperties implements MetadataUpdate {
+    private final Map<String, String> updated;
+
+    public SetProperties(Map<String, String> updated) {
+      this.updated = updated;
+    }
+
+    public Map<String, String> updated() {
+      return updated;
+    }
+  }
+
+  class RemoveProperties implements MetadataUpdate {
+    private final Set<String> removed;
+
+    public RemoveProperties(Set<String> removed) {
+      this.removed = removed;
+    }
+
+    public Set<String> removed() {
+      return removed;
+    }
+  }
+
+  class SetLocation implements MetadataUpdate {
+    private final String location;
+
+    public SetLocation(String location) {
+      this.location = location;
+    }
+
+    public String location() {
+      return location;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 59c71d7..164e295 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -22,8 +22,6 @@ package org.apache.iceberg;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -126,7 +124,7 @@ public class TableMetadata implements Serializable {
         freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
         freshSortOrderId, ImmutableList.of(freshSortOrder),
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
-        ImmutableList.of(), ImmutableList.of());
+        ImmutableList.of(), ImmutableList.of(), ImmutableList.of());
   }
 
   public static class SnapshotLogEntry implements HistoryEntry {
@@ -240,6 +238,7 @@ public class TableMetadata implements Serializable {
   private final Map<Integer, SortOrder> sortOrdersById;
   private final List<HistoryEntry> snapshotLog;
   private final List<MetadataLogEntry> previousFiles;
+  private final List<MetadataUpdate> changes;
 
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   TableMetadata(String metadataFileLocation,
@@ -260,7 +259,8 @@ public class TableMetadata implements Serializable {
                 long currentSnapshotId,
                 List<Snapshot> snapshots,
                 List<HistoryEntry> snapshotLog,
-                List<MetadataLogEntry> previousFiles) {
+                List<MetadataLogEntry> previousFiles,
+                List<MetadataUpdate> changes) {
     Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
     Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(), "Sort orders cannot be null or empty");
     Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
@@ -269,6 +269,8 @@ public class TableMetadata implements Serializable {
         "UUID is required in format v%s", formatVersion);
     Preconditions.checkArgument(formatVersion > 1 || lastSequenceNumber == 0,
         "Sequence number must be 0 in v1: %s", lastSequenceNumber);
+    Preconditions.checkArgument(metadataFileLocation == null || changes.isEmpty(),
+        "Cannot create TableMetadata with a metadata location and changes");
 
     this.metadataFileLocation = metadataFileLocation;
     this.formatVersion = formatVersion;
@@ -290,6 +292,9 @@ public class TableMetadata implements Serializable {
     this.snapshotLog = snapshotLog;
     this.previousFiles = previousFiles;
 
+    // changes are carried through until metadata is read from a file
+    this.changes = changes;
+
     this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
     this.schemasById = indexSchemas();
     this.specsById = indexSpecs(specs);
@@ -467,308 +472,61 @@ public class TableMetadata implements Serializable {
     return previousFiles;
   }
 
+  public List<MetadataUpdate> changes() {
+    return changes;
+  }
+
   public TableMetadata withUUID() {
-    if (uuid != null) {
-      return this;
-    } else {
-      return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
-          lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-          lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties,
-          currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-    }
+    return new Builder(this).assignUUID().build();
   }
 
   public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
-    PartitionSpec.checkCompatibility(spec(), newSchema);
-    SortOrder.checkCompatibility(sortOrder(), newSchema);
-    // rebuild all of the partition specs and sort orders for the new current schema
-    List<PartitionSpec> updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec));
-    List<SortOrder> updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
-
-    int newSchemaId = reuseOrCreateNewSchemaId(newSchema);
-    if (currentSchemaId == newSchemaId && newLastColumnId == lastColumnId) {
-      // the new spec and last column Id is already current and no change is needed
-      return this;
-    }
-
-    ImmutableList.Builder<Schema> builder = ImmutableList.<Schema>builder().addAll(schemas);
-    if (!schemasById.containsKey(newSchemaId)) {
-      builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds()));
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), newLastColumnId,
-        newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId,
-        defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setCurrentSchema(newSchema, newLastColumnId).build();
   }
 
   // The caller is responsible to pass a newPartitionSpec with correct partition field IDs
   public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
-    Schema schema = schema();
-
-    PartitionSpec.checkCompatibility(newPartitionSpec, schema);
-    ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(newPartitionSpec),
-        "Spec does not use sequential IDs that are required in v1: %s", newPartitionSpec);
-
-    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
-    int newDefaultSpecId = INITIAL_SPEC_ID;
-    for (PartitionSpec spec : specs) {
-      if (newPartitionSpec.compatibleWith(spec)) {
-        newDefaultSpecId = spec.specId();
-        break;
-      } else if (newDefaultSpecId <= spec.specId()) {
-        newDefaultSpecId = spec.specId() + 1;
-      }
-    }
-
-    if (defaultSpecId == newDefaultSpecId) {
-      // the new spec is already current and no change is needed
-      return this;
-    }
-
-    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
-        .addAll(specs);
-    if (!specsById.containsKey(newDefaultSpecId)) {
-      // get a fresh spec to ensure the spec ID is set to the new default
-      builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId,
-        builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()),
-        defaultSortOrderId, sortOrders, properties,
-        currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build();
   }
 
   public TableMetadata replaceSortOrder(SortOrder newOrder) {
-    Schema schema = schema();
-    SortOrder.checkCompatibility(newOrder, schema);
-
-    // determine the next order id
-    int newOrderId = INITIAL_SORT_ORDER_ID;
-    for (SortOrder order : sortOrders) {
-      if (order.sameOrder(newOrder)) {
-        newOrderId = order.orderId();
-        break;
-      } else if (newOrderId <= order.orderId()) {
-        newOrderId = order.orderId() + 1;
-      }
-    }
-
-    if (newOrderId == defaultSortOrderId) {
-      return this;
-    }
-
-    ImmutableList.Builder<SortOrder> builder = ImmutableList.builder();
-    builder.addAll(sortOrders);
-
-    if (!sortOrdersById.containsKey(newOrderId)) {
-      if (newOrder.isUnsorted()) {
-        newOrderId = SortOrder.unsorted().orderId();
-        builder.add(SortOrder.unsorted());
-      } else {
-        // rebuild the sort order using new column ids
-        builder.add(freshSortOrder(newOrderId, schema, newOrder));
-      }
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setDefaultSortOrder(newOrder).build();
   }
 
   public TableMetadata addStagedSnapshot(Snapshot snapshot) {
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
-        "Cannot add snapshot with sequence number %s older than last sequence number %s",
-        snapshot.sequenceNumber(), lastSequenceNumber);
-
-    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
-        .addAll(snapshots)
-        .add(snapshot)
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
-        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
-        defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).addSnapshot(snapshot).build();
   }
 
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
-    // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
-    if (snapshotsById.containsKey(snapshot.snapshotId())) {
-      return setCurrentSnapshotTo(snapshot);
-    }
-
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
-        "Cannot add snapshot with sequence number %s older than last sequence number %s",
-        snapshot.sequenceNumber(), lastSequenceNumber);
-
-    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
-        .addAll(snapshots)
-        .add(snapshot)
-        .build();
-    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
-        .addAll(snapshotLog)
-        .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
-        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
-        defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setCurrentSnapshot(snapshot).build();
   }
 
   public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
-    List<Snapshot> filtered = Lists.newArrayListWithExpectedSize(snapshots.size());
-    for (Snapshot snapshot : snapshots) {
-      // keep the current snapshot and any snapshots that do not match the removeIf condition
-      if (snapshot.snapshotId() == currentSnapshotId || !removeIf.test(snapshot)) {
-        filtered.add(snapshot);
-      }
-    }
-
-    // update the snapshot log
-    Set<Long> validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId));
-    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
-    for (HistoryEntry logEntry : snapshotLog) {
-      if (validIds.contains(logEntry.snapshotId())) {
-        // copy the log entries that are still valid
-        newSnapshotLog.add(logEntry);
-      } else {
-        // any invalid entry causes the history before it to be removed. otherwise, there could be
-        // history gaps that cause time-travel queries to produce incorrect results. for example,
-        // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
-        // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
-        // and t3 when in fact s2 was the current snapshot.
-        newSnapshotLog.clear();
-      }
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
-        ImmutableList.copyOf(newSnapshotLog), addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
-    ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
-        "Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
-        "Last sequence number %s is less than existing snapshot sequence number %s",
-        lastSequenceNumber, snapshot.sequenceNumber());
-
-    if (currentSnapshotId == snapshot.snapshotId()) {
-      // change is a noop
-      return this;
-    }
-
-    long nowMillis = System.currentTimeMillis();
-    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
-        .addAll(snapshotLog)
-        .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots,
-        newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    List<Snapshot> toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList());
+    return new Builder(this).removeSnapshots(toRemove).build();
   }
 
   public TableMetadata replaceProperties(Map<String, String> rawProperties) {
     ValidationException.check(rawProperties != null, "Cannot set properties to null");
     Map<String, String> newProperties = unreservedProperties(rawProperties);
-    TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
-        snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties));
 
-    int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
-    if (formatVersion != newFormatVersion) {
-      metadata = metadata.upgradeToFormatVersion(newFormatVersion);
-    }
-
-    return metadata;
-  }
-
-  public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
-    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
-    for (HistoryEntry logEntry : snapshotLog) {
-      if (!snapshotIds.contains(logEntry.snapshotId())) {
-        // copy the log entries that are still valid
-        newSnapshotLog.add(logEntry);
+    Set<String> removed = Sets.newHashSet(properties.keySet());
+    Map<String, String> updated = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+      removed.remove(entry.getKey());
+      String current = properties.get(entry.getKey());
+      if (current == null || !current.equals(entry.getValue())) {
+        updated.put(entry.getKey(), entry.getValue());
       }
     }
 
-    ValidationException.check(currentSnapshotId < 0 || // not set
-            Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
-        "Cannot set invalid snapshot log: latest entry is not the current snapshot");
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  /**
-   * Returns an updated {@link TableMetadata} with the current-snapshot-ID set to the given
-   * snapshot-ID and the snapshot-log reset to contain only the snapshot with the given snapshot-ID.
-   *
-   * @param snapshotId ID of a snapshot that must exist, or {@code -1L} to remove the current snapshot
-   *                   and return an empty snapshot log.
-   * @return {@link TableMetadata} with updated {@link #currentSnapshotId} and {@link #snapshotLog}
-   */
-  public TableMetadata withCurrentSnapshotOnly(long snapshotId) {
-    if ((currentSnapshotId == -1L && snapshotId == -1L && snapshots.isEmpty()) ||
-        (currentSnapshotId == snapshotId && snapshots.size() == 1)) {
-      return this;
-    }
-    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
-    if (snapshotId != -1L) {
-      Snapshot snapshot = snapshotsById.get(snapshotId);
-      Preconditions.checkArgument(snapshot != null, "Non-existent snapshot");
-      newSnapshotLog.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshotId));
-    }
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshotId,
-        snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  public TableMetadata withCurrentSchema(int schemaId) {
-    if (currentSchemaId == schemaId) {
-      return this;
-    }
-    Preconditions.checkArgument(schemasById.containsKey(schemaId), "Non-existent schema");
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  public TableMetadata withDefaultSortOrder(int sortOrderId) {
-    if (defaultSortOrderId == sortOrderId) {
-      return this;
-    }
-    Preconditions.checkArgument(sortOrdersById.containsKey(sortOrderId), "Non-existent sort-order");
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, sortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
+    int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
 
-  public TableMetadata withDefaultSpec(int specId) {
-    if (defaultSpecId == specId) {
-      return this;
-    }
-    Preconditions.checkArgument(specsById.containsKey(specId), "Non-existent partition spec");
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, specId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this)
+        .setProperties(updated)
+        .removeProperties(removed)
+        .upgradeFormatVersion(newFormatVersion)
+        .build();
   }
 
   private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
@@ -835,146 +593,62 @@ public class TableMetadata implements Serializable {
     AtomicInteger newLastColumnId = new AtomicInteger(lastColumnId);
     Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, schema(), newLastColumnId::incrementAndGet);
 
-    // determine the next spec id
-    OptionalInt maxSpecId = specs.stream().mapToInt(PartitionSpec::specId).max();
-    int nextSpecId = maxSpecId.orElse(TableMetadata.INITIAL_SPEC_ID) + 1;
-
-    // rebuild the partition spec using the new column ids
-    PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec);
-
-    // reassign partition field ids with existing partition specs in the table
-    AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId);
-    PartitionSpec newSpec = reassignPartitionIds(freshSpec, lastPartitionId::incrementAndGet);
-
-    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
-    int specId = specs.stream()
-        .filter(newSpec::compatibleWith)
-        .findFirst()
-        .map(PartitionSpec::specId)
-        .orElse(nextSpecId);
-
-    ImmutableList.Builder<PartitionSpec> specListBuilder = ImmutableList.<PartitionSpec>builder().addAll(specs);
-    if (!specsById.containsKey(specId)) {
-      specListBuilder.add(newSpec);
-    }
-
-    // determine the next order id
-    OptionalInt maxOrderId = sortOrders.stream().mapToInt(SortOrder::orderId).max();
-    int nextOrderId = maxOrderId.isPresent() ? maxOrderId.getAsInt() + 1 : INITIAL_SORT_ORDER_ID;
+    // rebuild the partition spec using the new column ids and reassign partition field ids to align with existing
+    // partition specs in the table
+    PartitionSpec freshSpec = reassignPartitionIds(
+        freshSpec(INITIAL_SPEC_ID, freshSchema, updatedPartitionSpec),
+        new AtomicInteger(lastAssignedPartitionId)::incrementAndGet);
 
     // rebuild the sort order using new column ids
-    int freshSortOrderId = updatedSortOrder.isUnsorted() ? updatedSortOrder.orderId() : nextOrderId;
-    SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, updatedSortOrder);
-
-    // if the order already exists, use the same ID. otherwise, use the fresh order ID
-    Optional<SortOrder> sameSortOrder = sortOrders.stream()
-        .filter(sortOrder -> sortOrder.sameOrder(freshSortOrder))
-        .findAny();
-    int orderId = sameSortOrder.map(SortOrder::orderId).orElse(freshSortOrderId);
-
-    ImmutableList.Builder<SortOrder> sortOrdersBuilder = ImmutableList.<SortOrder>builder().addAll(sortOrders);
-    if (!sortOrdersById.containsKey(orderId)) {
-      sortOrdersBuilder.add(freshSortOrder);
-    }
-
-    Map<String, String> newProperties = Maps.newHashMap();
-    newProperties.putAll(this.properties);
-    newProperties.putAll(unreservedProperties(updatedProperties));
+    SortOrder freshSortOrder = freshSortOrder(INITIAL_SORT_ORDER_ID, freshSchema, updatedSortOrder);
 
     // check if there is format version override
     int newFormatVersion = PropertyUtil.propertyAsInt(updatedProperties, TableProperties.FORMAT_VERSION, formatVersion);
 
-    // determine the next schema id
-    int freshSchemaId = reuseOrCreateNewSchemaId(freshSchema);
-    ImmutableList.Builder<Schema> schemasBuilder = ImmutableList.<Schema>builder().addAll(schemas);
-
-    if (!schemasById.containsKey(freshSchemaId)) {
-      schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds()));
-    }
-
-    TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation,
-        lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(),
-        specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()),
-        orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
-        -1, snapshots, ImmutableList.of(), addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties));
-
-    if (formatVersion != newFormatVersion) {
-      metadata = metadata.upgradeToFormatVersion(newFormatVersion);
-    }
-
-    return metadata;
+    return new Builder(this)
+        .upgradeFormatVersion(newFormatVersion)
+        .setCurrentSnapshot(null)
+        .setCurrentSchema(freshSchema, newLastColumnId.get())
+        .setDefaultPartitionSpec(freshSpec)
+        .setDefaultSortOrder(freshSortOrder)
+        .setLocation(newLocation)
+        .setProperties(unreservedProperties(updatedProperties))
+        .build();
   }
 
   public TableMetadata updateLocation(String newLocation) {
-    return new TableMetadata(null, formatVersion, uuid, newLocation,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setLocation(newLocation).build();
   }
 
   public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
-    Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
-        "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
-        newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
-    Preconditions.checkArgument(newFormatVersion >= formatVersion,
-        "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
-
-    if (newFormatVersion == formatVersion) {
-      return this;
-    }
-
-    return new TableMetadata(null, newFormatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
-        snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  private List<MetadataLogEntry> addPreviousFile(String previousFileLocation, long timestampMillis) {
-    return addPreviousFile(previousFileLocation, timestampMillis, properties);
-  }
-
-  private List<MetadataLogEntry> addPreviousFile(String previousFileLocation, long timestampMillis,
-                                                 Map<String, String> updatedProperties) {
-    if (previousFileLocation == null) {
-      return previousFiles;
-    }
-
-    int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
-            TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
-
-    List<MetadataLogEntry> newMetadataLog;
-    if (previousFiles.size() >= maxSize) {
-      int removeIndex = previousFiles.size() - maxSize + 1;
-      newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
-    } else {
-      newMetadataLog = Lists.newArrayList(previousFiles);
-    }
-    newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation));
-
-    return newMetadataLog;
+    return new Builder(this).upgradeFormatVersion(newFormatVersion).build();
   }
 
   private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
     PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
         .withSpecId(partitionSpec.specId());
 
-    // add all of the fields to the builder. IDs should not change.
+    // add all the fields to the builder. IDs should not change.
     for (PartitionField field : partitionSpec.fields()) {
       specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform());
     }
 
-    return specBuilder.build();
+    // build without validation because the schema may have changed in a way that makes this spec invalid. the spec
+    // should still be preserved so that older metadata can be interpreted.
+    return specBuilder.buildUnchecked();
   }
 
   private static SortOrder updateSortOrderSchema(Schema schema, SortOrder sortOrder) {
     SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(sortOrder.orderId());
 
-    // add all of the fields to the builder. IDs should not change.
+    // add all the fields to the builder. IDs should not change.
     for (SortField field : sortOrder.fields()) {
-      builder.addSortField(field.transform().toString(), field.sourceId(), field.direction(), field.nullOrder());
+      builder.addSortField(field.transform(), field.sourceId(), field.direction(), field.nullOrder());
     }
 
-    return builder.build();
+    // build without validation because the schema may have changed in a way that makes this order invalid. the order
+    // should still be preserved so that older metadata can be interpreted.
+    return builder.buildUnchecked();
   }
 
   private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
@@ -995,7 +669,11 @@ public class TableMetadata implements Serializable {
   }
 
   private static SortOrder freshSortOrder(int orderId, Schema schema, SortOrder sortOrder) {
-    SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(orderId);
+    SortOrder.Builder builder = SortOrder.builderFor(schema);
+
+    if (sortOrder.isSorted()) {
+      builder.withOrderId(orderId);
+    }
 
     for (SortField field : sortOrder.fields()) {
       // look up the name of the source field in the old schema to get the new schema's id
@@ -1047,17 +725,547 @@ public class TableMetadata implements Serializable {
     return builder.build();
   }
 
-  private int reuseOrCreateNewSchemaId(Schema newSchema) {
-    // if the schema already exists, use its id; otherwise use the highest id + 1
-    int newSchemaId = currentSchemaId;
-    for (Schema schema : schemas) {
-      if (schema.sameSchema(newSchema)) {
-        newSchemaId = schema.schemaId();
-        break;
-      } else if (schema.schemaId() >= newSchemaId) {
-        newSchemaId = schema.schemaId() + 1;
+  public static Builder buildFrom(TableMetadata base) {
+    return new Builder(base);
+  }
+
+  public static class Builder {
+    private final TableMetadata base;
+    private int formatVersion;
+    private String uuid;
+    private Long lastUpdatedMillis;
+    private String location;
+    private long lastSequenceNumber;
+    private int lastColumnId;
+    private int currentSchemaId;
+    private final List<Schema> schemas;
+    private int defaultSpecId;
+    private List<PartitionSpec> specs;
+    private int lastAssignedPartitionId;
+    private int defaultSortOrderId;
+    private List<SortOrder> sortOrders;
+    private final Map<String, String> properties;
+    private long currentSnapshotId;
+    private List<Snapshot> snapshots;
+
+    // change tracking
+    private final List<MetadataUpdate> changes;
+    private final int startingChangeCount;
+    private boolean discardChanges = false;
+
+    // handled in build
+    private final List<HistoryEntry> snapshotLog;
+    private final String previousFileLocation;
+    private final List<MetadataLogEntry> previousFiles;
+
+    // indexes for convenience
+    private final Map<Long, Snapshot> snapshotsById;
+    private final Map<Integer, Schema> schemasById;
+    private final Map<Integer, PartitionSpec> specsById;
+    private final Map<Integer, SortOrder> sortOrdersById;
+
+    private Builder(TableMetadata base) {
+      this.base = base;
+      this.formatVersion = base.formatVersion;
+      this.uuid = base.uuid;
+      this.lastUpdatedMillis = null;
+      this.location = base.location;
+      this.lastSequenceNumber = base.lastSequenceNumber;
+      this.lastColumnId = base.lastColumnId;
+      this.currentSchemaId = base.currentSchemaId;
+      this.schemas = Lists.newArrayList(base.schemas);
+      this.defaultSpecId = base.defaultSpecId;
+      this.specs = Lists.newArrayList(base.specs);
+      this.lastAssignedPartitionId = base.lastAssignedPartitionId;
+      this.defaultSortOrderId = base.defaultSortOrderId;
+      this.sortOrders = Lists.newArrayList(base.sortOrders);
+      this.properties = Maps.newHashMap(base.properties);
+      this.currentSnapshotId = base.currentSnapshotId;
+      this.snapshots = Lists.newArrayList(base.snapshots);
+      this.changes = Lists.newArrayList(base.changes);
+      this.startingChangeCount = changes.size();
+
+      this.snapshotLog = Lists.newArrayList(base.snapshotLog);
+      this.previousFileLocation = base.metadataFileLocation;
+      this.previousFiles = base.previousFiles;
+
+      this.snapshotsById = Maps.newHashMap(base.snapshotsById);
+      this.schemasById = Maps.newHashMap(base.schemasById);
+      this.specsById = Maps.newHashMap(base.specsById);
+      this.sortOrdersById = Maps.newHashMap(base.sortOrdersById);
+    }
+
+    public Builder assignUUID() {
+      if (uuid == null) {
+        this.uuid = UUID.randomUUID().toString();
+        changes.add(new MetadataUpdate.AssignUUID(uuid));
       }
+
+      return this;
+    }
+
+    public Builder upgradeFormatVersion(int newFormatVersion) {
+      Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+          "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
+          newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
+      Preconditions.checkArgument(newFormatVersion >= formatVersion,
+          "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
+
+      if (newFormatVersion == formatVersion) {
+        return this;
+      }
+
+      this.formatVersion = newFormatVersion;
+      changes.add(new MetadataUpdate.UpgradeFormatVersion(newFormatVersion));
+
+      return this;
+    }
+
+    public Builder setCurrentSchema(Schema newSchema, int newLastColumnId) {
+      setCurrentSchema(addSchemaInternal(newSchema, newLastColumnId));
+      return this;
+    }
+
+    public Builder setCurrentSchema(int schemaId) {
+      if (currentSchemaId == schemaId) {
+        return this;
+      }
+
+      Schema schema = schemasById.get(schemaId);
+      Preconditions.checkArgument(schema != null, "Cannot set current schema to unknown schema: %s", schemaId);
+
+      // rebuild all the partition specs and sort orders for the new current schema
+      this.specs = Lists.newArrayList(Iterables.transform(specs,
+          spec -> updateSpecSchema(schema, spec)));
+      specsById.clear();
+      specsById.putAll(indexSpecs(specs));
+
+      this.sortOrders = Lists.newArrayList(Iterables.transform(sortOrders,
+          order -> updateSortOrderSchema(schema, order)));
+      sortOrdersById.clear();
+      sortOrdersById.putAll(indexSortOrders(sortOrders));
+
+      this.currentSchemaId = schemaId;
+
+      changes.add(new MetadataUpdate.SetCurrentSchema(schemaId));
+
+      return this;
+    }
+
+    public Builder addSchema(Schema schema, int newLastColumnId) {
+      addSchemaInternal(schema, newLastColumnId);
+      return this;
+    }
+
+    public Builder setDefaultPartitionSpec(PartitionSpec spec) {
+      setDefaultPartitionSpec(addPartitionSpecInternal(spec));
+      return this;
+    }
+
+    public Builder setDefaultPartitionSpec(int specId) {
+      if (defaultSpecId == specId) {
+        // the new spec is already current and no change is needed
+        return this;
+      }
+
+      this.defaultSpecId = specId;
+      changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId));
+
+      return this;
+    }
+
+    public Builder addPartitionSpec(PartitionSpec spec) {
+      addPartitionSpecInternal(spec);
+      return this;
+    }
+
+    public Builder setDefaultSortOrder(SortOrder order) {
+      setDefaultSortOrder(addSortOrderInternal(order));
+      return this;
+    }
+
+    public Builder setDefaultSortOrder(int sortOrderId) {
+      if (sortOrderId == defaultSortOrderId) {
+        return this;
+      }
+
+      this.defaultSortOrderId = sortOrderId;
+      changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId));
+
+      return this;
+    }
+
+    public Builder addSortOrder(SortOrder order) {
+      addSortOrderInternal(order);
+      return this;
+    }
+
+    public Builder addSnapshot(Snapshot snapshot) {
+      if (snapshot == null || snapshotsById.containsKey(snapshot.snapshotId())) {
+        // change is a noop
+        return this;
+      }
+
+      ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+          "Cannot add snapshot with sequence number %s older than last sequence number %s",
+          snapshot.sequenceNumber(), lastSequenceNumber);
+
+      this.lastUpdatedMillis = snapshot.timestampMillis();
+      this.lastSequenceNumber = snapshot.sequenceNumber();
+      snapshots.add(snapshot);
+      snapshotsById.put(snapshot.snapshotId(), snapshot);
+      changes.add(new MetadataUpdate.AddSnapshot(snapshot));
+
+      return this;
+    }
+
+    public Builder setCurrentSnapshot(Snapshot snapshot) {
+      addSnapshot(snapshot);
+      setCurrentSnapshot(snapshot, null);
+      return this;
+    }
+
+    public Builder setCurrentSnapshot(long snapshotId) {
+      if (currentSnapshotId == snapshotId) {
+        // change is a noop
+        return this;
+      }
+
+      Snapshot snapshot = snapshotsById.get(snapshotId);
+      ValidationException.check(snapshot != null,
+          "Cannot set current snapshot to unknown: %s", snapshotId);
+
+      setCurrentSnapshot(snapshot, System.currentTimeMillis());
+
+      return this;
+    }
+
+    public Builder removeSnapshots(List<Snapshot> snapshotsToRemove) {
+      Set<Long> idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+      List<Snapshot> retainedSnapshots = Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size());
+      for (Snapshot snapshot : snapshots) {
+        long snapshotId = snapshot.snapshotId();
+        if (idsToRemove.contains(snapshotId)) {
+          snapshotsById.remove(snapshotId);
+          changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
+        } else {
+          retainedSnapshots.add(snapshot);
+        }
+      }
+
+      this.snapshots = retainedSnapshots;
+      if (!snapshotsById.containsKey(currentSnapshotId)) {
+        setCurrentSnapshot(null, System.currentTimeMillis());
+      }
+
+      return this;
+    }
+
+    public Builder setProperties(Map<String, String> updated) {
+      if (updated.isEmpty()) {
+        return this;
+      }
+
+      properties.putAll(updated);
+      changes.add(new MetadataUpdate.SetProperties(updated));
+
+      return this;
+    }
+
+    public Builder removeProperties(Set<String> removed) {
+      if (removed.isEmpty()) {
+        return this;
+      }
+
+      removed.forEach(properties::remove);
+      changes.add(new MetadataUpdate.RemoveProperties(removed));
+
+      return this;
+    }
+
+    public Builder setLocation(String newLocation) {
+      if (location != null && location.equals(newLocation)) {
+        return this;
+      }
+
+      this.location = newLocation;
+      changes.add(new MetadataUpdate.SetLocation(newLocation));
+
+      return this;
+    }
+
+    public Builder discardChanges() {
+      this.discardChanges = true;
+      return this;
+    }
+
+    public TableMetadata build() {
+      if (changes.size() == startingChangeCount && !(discardChanges && changes.size() > 0)) {
+        return base;
+      }
+
+      if (lastUpdatedMillis == null) {
+        this.lastUpdatedMillis = System.currentTimeMillis();
+      }
+
+      Schema schema = schemasById.get(currentSchemaId);
+      PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema);
+      SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema);
+
+      List<MetadataLogEntry> metadataHistory = addPreviousFile(
+          previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+      List<HistoryEntry> newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes);
+
+      return new TableMetadata(
+          null,
+          formatVersion,
+          uuid,
+          location,
+          lastSequenceNumber,
+          lastUpdatedMillis,
+          lastColumnId,
+          currentSchemaId,
+          ImmutableList.copyOf(schemas),
+          defaultSpecId,
+          ImmutableList.copyOf(specs),
+          lastAssignedPartitionId,
+          defaultSortOrderId,
+          ImmutableList.copyOf(sortOrders),
+          ImmutableMap.copyOf(properties),
+          currentSnapshotId,
+          ImmutableList.copyOf(snapshots),
+          ImmutableList.copyOf(newSnapshotLog),
+          ImmutableList.copyOf(metadataHistory),
+          discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)
+      );
+    }
+
+    private int addSchemaInternal(Schema schema, int newLastColumnId) {
+      Preconditions.checkArgument(newLastColumnId >= lastColumnId,
+          "Invalid last column ID: %s < %s (previous last column ID)", newLastColumnId, lastColumnId);
+
+      int newSchemaId = reuseOrCreateNewSchemaId(schema);
+      boolean schemaFound = schemasById.containsKey(newSchemaId);
+      if (schemaFound && newLastColumnId == lastColumnId) {
+        // the new spec and last column id is already current and no change is needed
+        return newSchemaId;
+      }
+
+      this.lastColumnId = newLastColumnId;
+
+      Schema newSchema;
+      if (newSchemaId != schema.schemaId()) {
+        newSchema = new Schema(newSchemaId, schema.columns(), schema.identifierFieldIds());
+      } else {
+        newSchema = schema;
+      }
+
+      if (!schemaFound) {
+        schemas.add(newSchema);
+        schemasById.put(newSchema.schemaId(), newSchema);
+      }
+
+      changes.add(new MetadataUpdate.AddSchema(newSchema, lastColumnId));
+
+      return newSchemaId;
+    }
+
+    private int reuseOrCreateNewSchemaId(Schema newSchema) {
+      // if the schema already exists, use its id; otherwise use the highest id + 1
+      int newSchemaId = currentSchemaId;
+      for (Schema schema : schemas) {
+        if (schema.sameSchema(newSchema)) {
+          return schema.schemaId();
+        } else if (schema.schemaId() >= newSchemaId) {
+          newSchemaId = schema.schemaId() + 1;
+        }
+      }
+      return newSchemaId;
+    }
+
+    private int addPartitionSpecInternal(PartitionSpec spec) {
+      int newSpecId = reuseOrCreateNewSpecId(spec);
+      if (specsById.containsKey(newSpecId)) {
+        return newSpecId;
+      }
+
+      Schema schema = schemasById.get(currentSchemaId);
+      PartitionSpec.checkCompatibility(spec, schema);
+      ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(spec),
+          "Spec does not use sequential IDs that are required in v1: %s", spec);
+
+      PartitionSpec newSpec = freshSpec(newSpecId, schema, spec);
+      this.lastAssignedPartitionId = Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId());
+      specs.add(newSpec);
+      specsById.put(newSpecId, newSpec);
+
+      changes.add(new MetadataUpdate.AddPartitionSpec(newSpec));
+
+      return newSpecId;
+    }
+
+    private int reuseOrCreateNewSpecId(PartitionSpec newSpec) {
+      // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+      int newSpecId = INITIAL_SPEC_ID;
+      for (PartitionSpec spec : specs) {
+        if (newSpec.compatibleWith(spec)) {
+          return spec.specId();
+        } else if (newSpecId <= spec.specId()) {
+          newSpecId = spec.specId() + 1;
+        }
+      }
+
+      return newSpecId;
+    }
+
+    private int addSortOrderInternal(SortOrder order) {
+      int newOrderId = reuseOrCreateNewSortOrderId(order);
+      if (sortOrdersById.containsKey(newOrderId)) {
+        return newOrderId;
+      }
+
+      Schema schema = schemasById.get(currentSchemaId);
+      SortOrder.checkCompatibility(order, schema);
+
+      SortOrder newOrder;
+      if (order.isUnsorted()) {
+        newOrder = SortOrder.unsorted();
+      } else {
+        // rebuild the sort order using new column ids
+        newOrder = freshSortOrder(newOrderId, schema, order);
+      }
+
+      sortOrders.add(newOrder);
+      sortOrdersById.put(newOrderId, newOrder);
+
+      changes.add(new MetadataUpdate.AddSortOrder(newOrder));
+
+      return newOrderId;
+    }
+
+    private int reuseOrCreateNewSortOrderId(SortOrder newOrder) {
+      if (newOrder.isUnsorted()) {
+        return SortOrder.unsorted().orderId();
+      }
+
+      // determine the next order id
+      int newOrderId = INITIAL_SORT_ORDER_ID;
+      for (SortOrder order : sortOrders) {
+        if (order.sameOrder(newOrder)) {
+          return order.orderId();
+        } else if (newOrderId <= order.orderId()) {
+          newOrderId = order.orderId() + 1;
+        }
+      }
+
+      return newOrderId;
+    }
+
+    private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) {
+      if (snapshot == null) {
+        this.currentSnapshotId = -1;
+        snapshotLog.clear();
+        changes.add(new MetadataUpdate.SetCurrentSnapshot(null));
+        return;
+      }
+
+      if (currentSnapshotId == snapshot.snapshotId()) {
+        return;
+      }
+
+      ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
+          "Last sequence number %s is less than existing snapshot sequence number %s",
+          lastSequenceNumber, snapshot.sequenceNumber());
+
+      this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis();
+      this.currentSnapshotId = snapshot.snapshotId();
+      snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, snapshot.snapshotId()));
+      changes.add(new MetadataUpdate.SetCurrentSnapshot(snapshot.snapshotId()));
+    }
+
+    private static List<MetadataLogEntry> addPreviousFile(
+        List<MetadataLogEntry> previousFiles, String previousFileLocation, long timestampMillis,
+        Map<String, String> properties) {
+      if (previousFileLocation == null) {
+        return previousFiles;
+      }
+
+      int maxSize = Math.max(1, PropertyUtil.propertyAsInt(properties,
+          TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
+
+      List<MetadataLogEntry> newMetadataLog;
+      if (previousFiles.size() >= maxSize) {
+        int removeIndex = previousFiles.size() - maxSize + 1;
+        newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
+      } else {
+        newMetadataLog = Lists.newArrayList(previousFiles);
+      }
+      newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation));
+
+      return newMetadataLog;
+    }
+
+    /**
+     * Finds intermediate snapshots that have not been committed as the current snapshot.
+     *
+     * @return a set of snapshot ids for all added snapshots that were later replaced as the current snapshot in changes
+     */
+    private static Set<Long> intermediateSnapshotIdSet(List<MetadataUpdate> changes, long currentSnapshotId) {
+      Set<Long> addedSnapshotIds = Sets.newHashSet();
+      Set<Long> intermediateSnapshotIds = Sets.newHashSet();
+      for (MetadataUpdate update : changes) {
+        if (update instanceof MetadataUpdate.AddSnapshot) {
+          // adds must always come before set current snapshot
+          MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) update;
+          addedSnapshotIds.add(addSnapshot.snapshot().snapshotId());
+        } else if (update instanceof MetadataUpdate.SetCurrentSnapshot) {
+          Long snapshotId = ((MetadataUpdate.SetCurrentSnapshot) update).snapshotId();
+          if (snapshotId != null && addedSnapshotIds.contains(snapshotId) && snapshotId != currentSnapshotId) {
+            intermediateSnapshotIds.add(snapshotId);
+          }
+        }
+      }
+
+      return intermediateSnapshotIds;
+    }
+
+    private static List<HistoryEntry> updateSnapshotLog(
+        List<HistoryEntry> snapshotLog, Map<Long, Snapshot> snapshotsById, long currentSnapshotId,
+        List<MetadataUpdate> changes) {
+      // find intermediate snapshots to suppress incorrect entries in the snapshot log.
+      //
+      // transactions can create snapshots that are never the current snapshot because several changes are combined
+      // by the transaction into one table metadata update. when each intermediate snapshot is added to table metadata,
+      // it is added to the snapshot log, assuming that it will be the current snapshot. when there are multiple
+      // snapshot updates, the log must be corrected by suppressing the intermediate snapshot entries.
+      //
+      // a snapshot is an intermediate snapshot if it was added but is not the current snapshot.
+      Set<Long> intermediateSnapshotIds = intermediateSnapshotIdSet(changes, currentSnapshotId);
+
+      // update the snapshot log
+      List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+      for (HistoryEntry logEntry : snapshotLog) {
+        long snapshotId = logEntry.snapshotId();
+        if (snapshotsById.containsKey(snapshotId) && !intermediateSnapshotIds.contains(snapshotId)) {
+          // copy the log entries that are still valid
+          newSnapshotLog.add(logEntry);
+        } else {
+          // any invalid entry causes the history before it to be removed. otherwise, there could be
+          // history gaps that cause time-travel queries to produce incorrect results. for example,
+          // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
+          // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
+          // and t3 when in fact s2 was the current snapshot.
+          newSnapshotLog.clear();
+        }
+      }
+
+      if (snapshotsById.get(currentSnapshotId) != null) {
+        ValidationException.check(Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
+            "Cannot set invalid snapshot log: latest entry is not the current snapshot");
+      }
+
+      return newSnapshotLog;
     }
-    return newSchemaId;
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 1e26fbb..8810fc1 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -433,6 +433,6 @@ public class TableMetadataParser {
     return new TableMetadata(metadataLocation, formatVersion, uuid, location,
         lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs,
         lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
-        snapshots, entries.build(), metadataEntries.build());
+        snapshots, entries.build(), metadataEntries.build(), ImmutableList.of() /* no changes from the file */);
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index 6ddbeab..97f2d98 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -108,7 +108,7 @@ public class TestTableMetadata {
         7, ImmutableList.of(TEST_SCHEMA, schema),
         5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
         3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
-        Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
+        Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of(), ImmutableList.of());
 
     String asJson = TableMetadataParser.toJson(expected);
     TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -180,7 +180,8 @@ public class TestTableMetadata {
         0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID,
         ImmutableList.of(schema), 6, ImmutableList.of(spec), spec.lastAssignedFieldId(),
         TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), ImmutableMap.of("property", "value"),
-        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
+        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of(),
+        ImmutableList.of());
 
     String asJson = toJsonWithoutSpecAndSchemaList(expected);
     TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -302,7 +303,7 @@ public class TestTableMetadata {
         7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
         3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
-        ImmutableList.copyOf(previousMetadataLog));
+        ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
 
     String asJson = TableMetadataParser.toJson(base);
     TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -322,6 +323,8 @@ public class TestTableMetadata {
           new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+    reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+    reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
     long currentTimestamp = System.currentTimeMillis();
     List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
     previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -337,7 +340,7 @@ public class TestTableMetadata {
         7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
         3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
-        ImmutableList.copyOf(previousMetadataLog));
+        ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
 
     previousMetadataLog.add(latestPreviousMetadata);
 
@@ -362,6 +365,8 @@ public class TestTableMetadata {
           new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+    reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+    reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
     long currentTimestamp = System.currentTimeMillis();
     List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
     previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -384,7 +389,7 @@ public class TestTableMetadata {
         ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
-        ImmutableList.copyOf(previousMetadataLog));
+        ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
 
     previousMetadataLog.add(latestPreviousMetadata);
 
@@ -414,6 +419,8 @@ public class TestTableMetadata {
           new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+    reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+    reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
     long currentTimestamp = System.currentTimeMillis();
     List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
     previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -431,12 +438,12 @@ public class TestTableMetadata {
         "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
 
     TableMetadata base = new TableMetadata(latestPreviousMetadata.file(), 1, UUID.randomUUID().toString(),
-        TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), 2,
+        TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(),
         ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
-        TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(SortOrder.unsorted()),
+        SortOrder.unsorted().orderId(), ImmutableList.of(SortOrder.unsorted()),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
-        ImmutableList.copyOf(previousMetadataLog));
+        ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
 
     previousMetadataLog.add(latestPreviousMetadata);
 
@@ -462,7 +469,7 @@ public class TestTableMetadata {
             LAST_ASSIGNED_COLUMN_ID, 7, ImmutableList.of(TEST_SCHEMA),
             SPEC_5.specId(), ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
             3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
-            ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+            ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
     );
   }
 
@@ -475,7 +482,7 @@ public class TestTableMetadata {
             System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
             7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(), ImmutableList.of(SPEC_5),
             SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
-            ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+            ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
     );
   }
 
@@ -581,7 +588,8 @@ public class TestTableMetadata {
         .add(1, 1005, "x_partition", "bucket[4]")
         .build();
     String location = "file://tmp/db/table";
-    TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of());
+    TableMetadata metadata = TableMetadata.newTableMetadata(
+        schema, PartitionSpec.unpartitioned(), location, ImmutableMap.of());
 
     AssertHelpers.assertThrows("Should fail to update an invalid partition spec",
         ValidationException.class, "Spec does not use sequential IDs that are required in v1",
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index 2e0ecd6..a24f94f 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -190,9 +190,10 @@ public class TestTables {
             throw new CommitFailedException("Injected failure");
           }
           Integer version = VERSIONS.get(tableName);
+          // remove changes from the committed metadata
+          this.current = TableMetadata.buildFrom(updatedMetadata).discardChanges().build();
           VERSIONS.put(tableName, version == null ? 0 : version + 1);
-          METADATA.put(tableName, updatedMetadata);
-          this.current = updatedMetadata;
+          METADATA.put(tableName, current);
         } else {
           throw new CommitFailedException(
               "Commit failed: table was updated at %d", current.lastUpdatedMillis());
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
index 106cad4..6b40662 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
@@ -79,18 +79,19 @@ public class NessieTableOperations extends BaseMetastoreTableOperations {
   }
 
   @Override
-  protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
-      int numRetries) {
+  protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
     super.refreshFromMetadataLocation(newLocation, shouldRetry, numRetries, this::loadTableMetadata);
   }
 
   private TableMetadata loadTableMetadata(String metadataLocation) {
     // Update the TableMetadata with the Content of NessieTableState.
-    return TableMetadataParser.read(io(), metadataLocation)
-        .withCurrentSnapshotOnly(table.getSnapshotId())
-        .withCurrentSchema(table.getSchemaId())
-        .withDefaultSortOrder(table.getSortOrderId())
-        .withDefaultSpec(table.getSpecId());
+    return TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation))
+        .setCurrentSnapshot(table.getSnapshotId())
+        .setCurrentSchema(table.getSchemaId())
+        .setDefaultSortOrder(table.getSortOrderId())
+        .setDefaultPartitionSpec(table.getSpecId())
+        .discardChanges()
+        .build();
   }
 
   @Override