You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/09 19:14:41 UTC

[incubator-iceberg] branch master updated: Prepare metadata writers for format v2 (#903)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c094da5  Prepare metadata writers for format v2 (#903)
c094da5 is described below

commit c094da57db072187b2d930ae6c5590218012ec3f
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 9 12:14:34 2020 -0700

    Prepare metadata writers for format v2 (#903)
    
    * Track format version in TableMetadata.
    * Pass formatVersion when creating a manifest writer.
    * Pass formatVersion when creating a manifest list writer.
    * Add v2 change section to spec.
---
 .../org/apache/iceberg/BaseRewriteManifests.java   |  12 +-
 .../main/java/org/apache/iceberg/FastAppend.java   |   8 +-
 .../org/apache/iceberg/ManifestListWriter.java     |  18 +-
 .../java/org/apache/iceberg/ManifestWriter.java    |  22 ++-
 .../apache/iceberg/MergingSnapshotProducer.java    |  15 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |  13 +-
 .../java/org/apache/iceberg/TableMetadata.java     |  68 ++++---
 .../org/apache/iceberg/TableMetadataParser.java    |  12 +-
 .../org/apache/iceberg/TestFormatVersions.java     |  65 ++++++
 .../java/org/apache/iceberg/TestSnapshotJson.java  |   3 +-
 .../java/org/apache/iceberg/TestTableMetadata.java | 220 +++++++++++++--------
 .../test/java/org/apache/iceberg/TestTables.java   |   4 +-
 site/docs/spec.md                                  |  14 ++
 .../apache/iceberg/spark/source/TestTables.java    |   3 +-
 14 files changed, 319 insertions(+), 158 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 5a9469c..0eee04b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -74,7 +73,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
   private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
   private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();
 
-  private final AtomicInteger manifestSuffix = new AtomicInteger(0);
   private final AtomicLong entryCount = new AtomicLong(0);
 
   private Function<DataFile, Object> clusterByFunc;
@@ -157,7 +155,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) {
-      OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
+      OutputFile newFile = newManifestOutput();
       return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -336,18 +334,14 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
 
     synchronized void addEntry(ManifestEntry entry) {
       if (writer == null) {
-        writer = newWriter();
+        writer = newManifestWriter(spec);
       } else if (writer.length() >= getManifestTargetSizeBytes()) {
         close();
-        writer = newWriter();
+        writer = newManifestWriter(spec);
       }
       writer.existing(entry);
     }
 
-    private ManifestWriter newWriter() {
-      return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId());
-    }
-
     synchronized void close() {
       if (writer != null) {
         try {
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 246e10f..3338831 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.OutputFile;
@@ -48,7 +47,6 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
   private ManifestFile newManifest = null;
-  private final AtomicInteger manifestCount = new AtomicInteger(0);
   private boolean hasNewFiles = false;
 
   FastAppend(TableOperations ops) {
@@ -110,7 +108,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
-      OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+      OutputFile newManifestPath = newManifestOutput();
       return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder);
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -165,9 +163,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
     }
 
     if (newManifest == null && newFiles.size() > 0) {
-      OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
-      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      ManifestWriter writer = newManifestWriter(spec);
       try {
         writer.addAll(newFiles);
       } finally {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 0271695..e81d496 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -28,10 +28,18 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
 
-class ManifestListWriter implements FileAppender<ManifestFile> {
+abstract class ManifestListWriter implements FileAppender<ManifestFile> {
+  static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
+                                  long snapshotId, Long parentSnapshotId) {
+    if (formatVersion == 1) {
+      return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
+    }
+    throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
+  }
+
   private final FileAppender<ManifestFile> writer;
 
-  ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+  private ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
     this.writer = newAppender(snapshotFile, ImmutableMap.of(
         "snapshot-id", String.valueOf(snapshotId),
         "parent-snapshot-id", String.valueOf(parentSnapshotId)));
@@ -80,4 +88,10 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
       throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
     }
   }
+
+  static class V1Writer extends ManifestListWriter {
+    private V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+      super(snapshotFile, snapshotId, parentSnapshotId);
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index b7ed80d..d1a9fa1 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Writer for manifest files.
  */
-public class ManifestWriter implements FileAppender<DataFile> {
+public abstract class ManifestWriter implements FileAppender<DataFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
 
   static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
@@ -44,7 +44,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
   static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
                                    SnapshotSummary.Builder summaryBuilder,
                                    Set<ManifestEntry.Status> allowedEntryStatuses) {
-    ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
+    ManifestWriter writer = new V1Writer(reader.spec(), outputFile, snapshotId);
     boolean threw = true;
     try {
       for (ManifestEntry entry : reader.entries()) {
@@ -93,7 +93,15 @@ public class ManifestWriter implements FileAppender<DataFile> {
    * @return a manifest writer
    */
   public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
-    return new ManifestWriter(spec, outputFile, null);
+    // always use a v1 writer for appended manifests because sequence number must be inherited
+    return write(1, spec, outputFile, null);
+  }
+
+  static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+    if (formatVersion == 1) {
+      return new V1Writer(spec, outputFile, snapshotId);
+    }
+    throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
   }
 
   private final OutputFile file;
@@ -111,7 +119,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
   private int deletedFiles = 0;
   private long deletedRows = 0L;
 
-  ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+  private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
     this.file = file;
     this.specId = spec.specId();
     this.writer = newAppender(FileFormat.AVRO, spec, file);
@@ -229,4 +237,10 @@ public class ManifestWriter implements FileAppender<DataFile> {
       throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
     }
   }
+
+  private static class V1Writer extends ManifestWriter {
+    V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+      super(spec, file, snapshotId);
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e08ae49..4537c23 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -33,7 +33,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -87,7 +86,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   private final boolean snapshotIdInheritanceEnabled;
 
   // update data
-  private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final List<DataFile> newFiles = Lists.newArrayList();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
@@ -230,7 +228,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
-      OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+      OutputFile newManifestPath = newManifestOutput();
       return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary);
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -542,8 +540,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     // manifest. produce a copy of the manifest with all deleted files removed.
     List<DataFile> deletedFiles = Lists.newArrayList();
     Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
-    OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
-    ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+    ManifestWriter writer = newManifestWriter(reader.spec());
     try {
       reader.entries().forEach(entry -> {
         DataFile file = entry.file();
@@ -655,9 +652,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
       return mergeManifests.get(bin);
     }
 
-    OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
-    ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+    ManifestWriter writer = newManifestWriter(ops.current().spec());
     try {
       for (ManifestFile manifest : bin) {
         try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
@@ -697,9 +692,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     }
 
     if (cachedNewManifest == null) {
-      OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
-      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      ManifestWriter writer = newManifestWriter(spec);
       try {
         writer.addAll(newFiles);
       } finally {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 4cf4600..0d3239f 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -75,6 +75,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
 
   private final TableOperations ops;
   private final String commitUUID = UUID.randomUUID().toString();
+  private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
   private volatile Long snapshotId = null;
@@ -148,8 +149,8 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
     if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
       OutputFile manifestList = manifestListPath();
 
-      try (ManifestListWriter writer = new ManifestListWriter(
-          manifestList, snapshotId(), parentSnapshotId)) {
+      try (ManifestListWriter writer = ManifestListWriter.write(
+          ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {
 
         // keep track of the manifest lists created
         manifestLists.add(manifestList.location());
@@ -310,9 +311,13 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
         String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID))));
   }
 
-  protected OutputFile manifestPath(int manifestNumber) {
+  protected OutputFile newManifestOutput() {
     return ops.io().newOutputFile(
-        ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestNumber)));
+        ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
+  }
+
+  protected ManifestWriter newManifestWriter(PartitionSpec spec) {
+    return ManifestWriter.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
   }
 
   protected long snapshotId() {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index e44e002..4d12cf5 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -43,17 +43,12 @@ import org.apache.iceberg.util.PropertyUtil;
  * Metadata for a table.
  */
 public class TableMetadata {
-  static final int TABLE_FORMAT_VERSION = 1;
+  static final int DEFAULT_TABLE_FORMAT_VERSION = 1;
+  static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
   static final int INITIAL_SPEC_ID = 0;
 
   public static TableMetadata newTableMetadata(Schema schema,
                                                PartitionSpec spec,
-                                               String location) {
-    return newTableMetadata(schema, spec, location, ImmutableMap.of());
-  }
-
-  public static TableMetadata newTableMetadata(Schema schema,
-                                               PartitionSpec spec,
                                                String location,
                                                Map<String, String> properties) {
     // reassign all column ids to ensure consistency
@@ -73,7 +68,7 @@ public class TableMetadata {
     }
     PartitionSpec freshSpec = specBuilder.build();
 
-    return new TableMetadata(null, UUID.randomUUID().toString(), location,
+    return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location,
         System.currentTimeMillis(),
         lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
@@ -170,6 +165,7 @@ public class TableMetadata {
   private final InputFile file;
 
   // stored metadata
+  private final int formatVersion;
   private final String uuid;
   private final String location;
   private final long lastUpdatedMillis;
@@ -186,6 +182,7 @@ public class TableMetadata {
   private final List<MetadataLogEntry> previousFiles;
 
   TableMetadata(InputFile file,
+                int formatVersion,
                 String uuid,
                 String location,
                 long lastUpdatedMillis,
@@ -198,6 +195,13 @@ public class TableMetadata {
                 List<Snapshot> snapshots,
                 List<HistoryEntry> snapshotLog,
                 List<MetadataLogEntry> previousFiles) {
+    Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+        "Unsupported format version: v%s", formatVersion);
+    if (formatVersion > 1) {
+      Preconditions.checkArgument(uuid != null, "UUID is required in format v%s", formatVersion);
+    }
+
+    this.formatVersion = formatVersion;
     this.file = file;
     this.uuid = uuid;
     this.location = location;
@@ -240,6 +244,10 @@ public class TableMetadata {
         "Invalid table metadata: Cannot find current version");
   }
 
+  public int formatVersion() {
+    return formatVersion;
+  }
+
   public InputFile file() {
     return file;
   }
@@ -328,24 +336,18 @@ public class TableMetadata {
     if (uuid != null) {
       return this;
     } else {
-      return new TableMetadata(null, UUID.randomUUID().toString(), location,
+      return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
           lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
           currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
     }
   }
 
-  public TableMetadata updateTableLocation(String newLocation) {
-    return new TableMetadata(null, uuid, newLocation,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
-        currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
-  }
-
   public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
     PartitionSpec.checkCompatibility(spec(), newSchema);
     // rebuild all of the partition specs for the new current schema
     List<PartitionSpec> updatedSpecs = Lists.transform(specs,
         spec -> updateSpecSchema(newSchema, spec));
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
@@ -374,7 +376,7 @@ public class TableMetadata {
       builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
     }
 
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
         builder.build(), properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
@@ -385,7 +387,7 @@ public class TableMetadata {
         .addAll(snapshots)
         .add(snapshot)
         .build();
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
@@ -404,7 +406,7 @@ public class TableMetadata {
         .addAll(snapshotLog)
         .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
         .build();
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
@@ -435,7 +437,7 @@ public class TableMetadata {
       }
     }
 
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog),
         addPreviousFile(file, lastUpdatedMillis));
@@ -456,14 +458,14 @@ public class TableMetadata {
         .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
         .build();
 
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
         snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
   public TableMetadata replaceProperties(Map<String, String> newProperties) {
     ValidationException.check(newProperties != null, "Cannot set properties to null");
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
   }
@@ -480,7 +482,7 @@ public class TableMetadata {
     ValidationException.check(currentSnapshotId < 0 || // not set
             Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
         "Cannot set invalid snapshot log: latest entry is not the current snapshot");
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
@@ -519,14 +521,30 @@ public class TableMetadata {
     newProperties.putAll(this.properties);
     newProperties.putAll(updatedProperties);
 
-    return new TableMetadata(null, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, location,
         System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
         specId, builder.build(), ImmutableMap.copyOf(newProperties),
         -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
   }
 
   public TableMetadata updateLocation(String newLocation) {
-    return new TableMetadata(null, uuid, newLocation,
+    return new TableMetadata(null, formatVersion, uuid, newLocation,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+  }
+
+  public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
+    Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+        "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
+        newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
+    Preconditions.checkArgument(newFormatVersion >= formatVersion,
+        "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
+
+    if (newFormatVersion == formatVersion) {
+      return this;
+    }
+
+    return new TableMetadata(null, newFormatVersion, uuid, location,
         System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index fd68338..6fca73b 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -151,7 +151,7 @@ public class TableMetadataParser {
   private static void toJson(TableMetadata metadata, JsonGenerator generator) throws IOException {
     generator.writeStartObject();
 
-    generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+    generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
     generator.writeStringField(TABLE_UUID, metadata.uuid());
     generator.writeStringField(LOCATION, metadata.location());
     generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
@@ -161,8 +161,10 @@ public class TableMetadataParser {
     SchemaParser.toJson(metadata.schema(), generator);
 
     // for older readers, continue writing the default spec as "partition-spec"
-    generator.writeFieldName(PARTITION_SPEC);
-    PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+    if (metadata.formatVersion() == 1) {
+      generator.writeFieldName(PARTITION_SPEC);
+      PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+    }
 
     // write the default spec ID and spec list
     generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
@@ -226,7 +228,7 @@ public class TableMetadataParser {
         "Cannot parse metadata from a non-object: %s", node);
 
     int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
-    Preconditions.checkArgument(formatVersion == TableMetadata.TABLE_FORMAT_VERSION,
+    Preconditions.checkArgument(formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
         "Cannot read unsupported version %s", formatVersion);
 
     String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
@@ -295,7 +297,7 @@ public class TableMetadataParser {
       }
     }
 
-    return new TableMetadata(file, uuid, location,
+    return new TableMetadata(file, formatVersion, uuid, location,
         lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
         currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
         ImmutableList.copyOf(metadataEntries.iterator()));
diff --git a/core/src/test/java/org/apache/iceberg/TestFormatVersions.java b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
new file mode 100644
index 0000000..f58d70b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFormatVersions extends TableTestBase {
+  @Test
+  public void testDefaultFormatVersion() {
+    Assert.assertEquals("Should default to v1", 1, table.ops().current().formatVersion());
+  }
+
+  @Test
+  public void testFormatVersionUpgrade() {
+    TableOperations ops = table.ops();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.upgradeToFormatVersion(2));
+
+    Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+  }
+
+  @Test
+  public void testFormatVersionDowngrade() {
+    TableOperations ops = table.ops();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.upgradeToFormatVersion(2));
+
+    Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+
+    AssertHelpers.assertThrows("Should reject a version downgrade",
+        IllegalArgumentException.class, "Cannot downgrade",
+        () -> ops.current().upgradeToFormatVersion(1));
+
+    Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+  }
+
+  @Test
+  public void testFormatVersionUpgradeNotSupported() {
+    TableOperations ops = table.ops();
+    TableMetadata base = ops.current();
+    AssertHelpers.assertThrows("Should reject an unsupported version upgrade",
+        IllegalArgumentException.class, "Cannot upgrade table to unsupported format version",
+        () -> ops.commit(base, base.upgradeToFormatVersion(TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1)));
+
+    Assert.assertEquals("Should report v1", 1, ops.current().formatVersion());
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 16ad4ee..5e55d66 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -95,8 +95,7 @@ public class TestSnapshotJson {
     Assert.assertTrue(manifestList.delete());
     manifestList.deleteOnExit();
 
-    try (ManifestListWriter writer = new ManifestListWriter(
-        Files.localOutput(manifestList), id, parentId)) {
+    try (ManifestListWriter writer = ManifestListWriter.write(1, Files.localOutput(manifestList), id, parentId)) {
       writer.addAll(manifests);
     }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index c2603f2..a77530b 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -55,8 +55,21 @@ import static org.apache.iceberg.TableMetadataParser.PARTITION_SPEC;
 import static org.apache.iceberg.TableMetadataParser.PROPERTIES;
 import static org.apache.iceberg.TableMetadataParser.SCHEMA;
 import static org.apache.iceberg.TableMetadataParser.SNAPSHOTS;
+import static org.apache.iceberg.TableMetadataParser.TABLE_UUID;
 
 public class TestTableMetadata {
+  private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+  private static final Schema TEST_SCHEMA = new Schema(
+      Types.NestedField.required(1, "x", Types.LongType.get()),
+      Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+      Types.NestedField.required(3, "z", Types.LongType.get())
+  );
+
+  private static final int LAST_ASSIGNED_COLUMN_ID = 3;
+
+  private static final PartitionSpec SPEC_5 = PartitionSpec.builderFor(TEST_SCHEMA).withSpecId(5).build();
+
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
@@ -64,30 +77,22 @@ public class TestTableMetadata {
 
   @Test
   public void testJsonConversion() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
         .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
         .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
         .build();
 
-    TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+    TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
 
@@ -95,6 +100,8 @@ public class TestTableMetadata {
     TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
+    Assert.assertEquals("Format version should match",
+        expected.formatVersion(), metadata.formatVersion());
     Assert.assertEquals("Table UUID should match",
         expected.uuid(), metadata.uuid());
     Assert.assertEquals("Table location should match",
@@ -128,27 +135,19 @@ public class TestTableMetadata {
 
   @Test
   public void testFromJsonSortsSnapshotLog() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
 
-    TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+    TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.of());
 
@@ -173,13 +172,7 @@ public class TestTableMetadata {
 
   @Test
   public void testBackwardCompat() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build();
+    PartitionSpec spec = PartitionSpec.builderFor(TEST_SCHEMA).identity("x").withSpecId(6).build();
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
@@ -190,8 +183,8 @@ public class TestTableMetadata {
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
+    TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
+        System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
 
@@ -199,6 +192,8 @@ public class TestTableMetadata {
     TableMetadata metadata = TableMetadataParser
         .fromJson(ops.io(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
+    Assert.assertEquals("Format version should match",
+        expected.formatVersion(), metadata.formatVersion());
     Assert.assertNull("Table UUID should not be assigned", metadata.uuid());
     Assert.assertEquals("Table location should match",
         expected.location(), metadata.location());
@@ -242,7 +237,7 @@ public class TestTableMetadata {
 
       generator.writeStartObject(); // start table metadata object
 
-      generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+      generator.writeNumberField(FORMAT_VERSION, 1);
       generator.writeStringField(LOCATION, metadata.location());
       generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
       generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
@@ -281,22 +276,14 @@ public class TestTableMetadata {
 
   @Test
   public void testJsonWithPreviousMetadataLog() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
     long currentTimestamp = System.currentTimeMillis();
@@ -304,8 +291,8 @@ public class TestTableMetadata {
     previousMetadataLog.add(new MetadataLogEntry(currentTimestamp,
         "/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json"));
 
-    TableMetadata base = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+    TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -318,23 +305,15 @@ public class TestTableMetadata {
   }
 
   @Test
-  public void testAddPreviousMetadataRemoveNone() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+  public void testAddPreviousMetadataRemoveNone() {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
     long currentTimestamp = System.currentTimeMillis();
@@ -347,8 +326,8 @@ public class TestTableMetadata {
     MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 80,
         "/tmp/000003-" + UUID.randomUUID().toString() + ".metadata.json");
 
-    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
-        "s3://bucket/test/location", currentTimestamp - 80, 3, schema, 5, ImmutableList.of(spec),
+    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+        TEST_LOCATION, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -365,23 +344,15 @@ public class TestTableMetadata {
   }
 
   @Test
-  public void testAddPreviousMetadataRemoveOne() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+  public void testAddPreviousMetadataRemoveOne() {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
     long currentTimestamp = System.currentTimeMillis();
@@ -400,9 +371,9 @@ public class TestTableMetadata {
     MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50,
         "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
 
-    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
-        "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 5,
-        ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId,
+    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+        TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
+        ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
 
@@ -423,23 +394,15 @@ public class TestTableMetadata {
   }
 
   @Test
-  public void testAddPreviousMetadataRemoveMultiple() throws Exception {
-    Schema schema = new Schema(
-        Types.NestedField.required(1, "x", Types.LongType.get()),
-        Types.NestedField.required(2, "y", Types.LongType.get()),
-        Types.NestedField.required(3, "z", Types.LongType.get())
-    );
-
-    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+  public void testAddPreviousMetadataRemoveMultiple() {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
         ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
         ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
 
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
     long currentTimestamp = System.currentTimeMillis();
@@ -458,9 +421,9 @@ public class TestTableMetadata {
     MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50,
         "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
 
-    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
-        "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 2,
-        ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId,
+    TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+        TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
+        ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
 
@@ -480,4 +443,87 @@ public class TestTableMetadata {
         ImmutableList.copyOf(removedPreviousMetadata));
   }
 
+  @Test
+  public void testV2UUIDValidation() {
+    AssertHelpers.assertThrows("Should reject v2 metadata without a UUID",
+        IllegalArgumentException.class, "UUID is required in format v2",
+        () -> new TableMetadata(null, 2, null, TEST_LOCATION, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
+            TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, ImmutableList.of(),
+            ImmutableList.of(), ImmutableList.of())
+    );
+  }
+
+  @Test
+  public void testVersionValidation() {
+    int unsupportedVersion = TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1;
+    AssertHelpers.assertThrows("Should reject unsupported metadata",
+        IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion,
+        () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, System.currentTimeMillis(),
+            LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
+            ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+    );
+  }
+
+  @Test
+  public void testParserVersionValidation() throws Exception {
+    String supportedVersion = toJsonWithVersion(
+        TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()),
+        TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION);
+    TableMetadata parsed = TableMetadataParser.fromJson(
+        ops.io(), null, JsonUtil.mapper().readValue(supportedVersion, JsonNode.class));
+    Assert.assertNotNull("Should successfully read supported metadata version", parsed);
+
+    String unsupportedVersion = toJsonWithVersion(
+        TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()),
+        TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1);
+    AssertHelpers.assertThrows("Should not read unsupported metadata",
+        IllegalArgumentException.class, "Cannot read unsupported version",
+        () -> TableMetadataParser.fromJson(
+            ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)));
+  }
+
+  public static String toJsonWithVersion(TableMetadata metadata, int version) {
+    StringWriter writer = new StringWriter();
+    try {
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+      generator.writeStartObject(); // start table metadata object
+
+      generator.writeNumberField(FORMAT_VERSION, version);
+      generator.writeStringField(TABLE_UUID, metadata.uuid());
+      generator.writeStringField(LOCATION, metadata.location());
+      generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+      generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
+
+      generator.writeFieldName(SCHEMA);
+      SchemaParser.toJson(metadata.schema(), generator);
+
+      // mimic an old writer by writing only partition-spec and not the default ID or spec list
+      generator.writeFieldName(PARTITION_SPEC);
+      PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+      generator.writeObjectFieldStart(PROPERTIES);
+      for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
+        generator.writeStringField(keyValue.getKey(), keyValue.getValue());
+      }
+      generator.writeEndObject();
+
+      generator.writeNumberField(CURRENT_SNAPSHOT_ID,
+          metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
+
+      generator.writeArrayFieldStart(SNAPSHOTS);
+      for (Snapshot snapshot : metadata.snapshots()) {
+        SnapshotParser.toJson(snapshot, generator);
+      }
+      generator.writeEndArray();
+      // skip the snapshot log
+
+      generator.writeEndObject(); // end table metadata object
+
+      generator.flush();
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
+    }
+    return writer.toString();
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index 20d2f3e..41974e3 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -43,7 +43,7 @@ public class TestTables {
     if (ops.current() != null) {
       throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
     }
-    ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString()));
+    ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of()));
     return new TestTable(ops, name);
   }
 
@@ -53,7 +53,7 @@ public class TestTables {
       throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
     }
 
-    TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString());
+    TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of());
 
     return Transactions.createTableTransaction(ops, metadata);
   }
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 921c100..332e247 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -677,3 +677,17 @@ This serialization scheme is for storing single values as individual binary valu
 | **`map`**                    | Not supported                                                                                                |
 
 
+## Format version changes
+
+### Version 2
+
+Writing metadata:
+* Table metadata field `sequence-number` is required.
+* Table metadata field `table-uuid` is required.
+* Table metadata field `partition-specs` is required.
+* Table metadata field `default-spec-id` is required.
+* Table metadata field `partition-spec` is no longer required and may be omitted.
+* Snapshot field `manifest-list` is required.
+* Snapshot field `manifests` is not allowed.
+
+Note that these requirements apply when writing data to a v2 table. Tables that are upgraded from v1 may contain metadata that does not follow these requirements. Implementations should remain backward-compatible with v1 metadata requirements.
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
index 879aa16..b9d6c9c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.spark.source;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.File;
 import java.util.Map;
@@ -49,7 +50,7 @@ class TestTables {
     if (ops.current() != null) {
       throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
     }
-    ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString()));
+    ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of()));
     return new TestTable(ops, name);
   }