You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/04/10 23:02:43 UTC

[incubator-iceberg] branch master updated: Update Snapshot and TableMetadata with sequence numbers (#910)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 720b240  Update Snapshot and TableMetadata with sequence numbers (#910)
720b240 is described below

commit 720b240a3e6a32b6f01173ebcc448c2c6e32e13a
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Apr 10 16:02:33 2020 -0700

    Update Snapshot and TableMetadata with sequence numbers (#910)
---
 api/src/main/java/org/apache/iceberg/Snapshot.java |  9 +++
 .../main/java/org/apache/iceberg/BaseSnapshot.java | 12 +++-
 .../org/apache/iceberg/ManifestListWriter.java     |  9 +++
 .../java/org/apache/iceberg/SnapshotParser.java    | 14 +++-
 .../java/org/apache/iceberg/SnapshotProducer.java  |  9 +--
 .../java/org/apache/iceberg/TableMetadata.java     | 74 +++++++++++++++-------
 .../org/apache/iceberg/TableMetadataParser.java    | 12 +++-
 .../java/org/apache/iceberg/util/JsonUtil.java     |  2 +-
 .../java/org/apache/iceberg/TestSnapshotJson.java  |  6 +-
 .../java/org/apache/iceberg/TestTableMetadata.java | 36 +++++++----
 site/docs/spec.md                                  | 55 +++++++++-------
 .../spark/source/TestForwardCompatibility.java     |  5 ++
 12 files changed, 174 insertions(+), 69 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java
index 25822c9..1a7376f 100644
--- a/api/src/main/java/org/apache/iceberg/Snapshot.java
+++ b/api/src/main/java/org/apache/iceberg/Snapshot.java
@@ -32,6 +32,15 @@ import java.util.Map;
  */
 public interface Snapshot {
   /**
+   * Return this snapshot's sequence number.
+   * <p>
+   * Sequence numbers are assigned when a snapshot is committed.
+   *
+   * @return a long sequence number
+   */
+  long sequenceNumber();
+
+  /**
    * Return this snapshot's ID.
    *
    * @return a long ID
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 2c82d8b..6054d6f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -35,9 +35,12 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 
 class BaseSnapshot implements Snapshot {
+  private static final long INITIAL_SEQUENCE_NUMBER = 0;
+
   private final FileIO io;
   private final long snapshotId;
   private final Long parentId;
+  private final long sequenceNumber;
   private final long timestampMillis;
   private final InputFile manifestList;
   private final String operation;
@@ -60,6 +63,7 @@ class BaseSnapshot implements Snapshot {
   }
 
   BaseSnapshot(FileIO io,
+               long sequenceNumber,
                long snapshotId,
                Long parentId,
                long timestampMillis,
@@ -67,6 +71,7 @@ class BaseSnapshot implements Snapshot {
                Map<String, String> summary,
                InputFile manifestList) {
     this.io = io;
+    this.sequenceNumber = sequenceNumber;
     this.snapshotId = snapshotId;
     this.parentId = parentId;
     this.timestampMillis = timestampMillis;
@@ -82,11 +87,16 @@ class BaseSnapshot implements Snapshot {
                String operation,
                Map<String, String> summary,
                List<ManifestFile> manifests) {
-    this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
+    this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
     this.manifests = manifests;
   }
 
   @Override
+  public long sequenceNumber() {
+    return sequenceNumber;
+  }
+
+  @Override
   public long snapshotId() {
     return snapshotId;
   }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index e81d496..6c999b8 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Iterator;
@@ -31,7 +32,15 @@ import org.apache.iceberg.io.OutputFile;
 abstract class ManifestListWriter implements FileAppender<ManifestFile> {
   static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
                                   long snapshotId, Long parentSnapshotId) {
+    Preconditions.checkArgument(formatVersion == 1, "Sequence number is required for format v%s", formatVersion);
+    return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
+  }
+
+  static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
+                                  long snapshotId, Long parentSnapshotId, long sequenceNumber) {
     if (formatVersion == 1) {
+      Preconditions.checkArgument(sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER,
+          "Invalid sequence number for v1 manifest list: %s", sequenceNumber);
       return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
     }
     throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
index 17b8083..db81cde 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
@@ -37,6 +37,7 @@ public class SnapshotParser {
 
   private SnapshotParser() {}
 
+  private static final String SEQUENCE_NUMBER = "sequence-number";
   private static final String SNAPSHOT_ID = "snapshot-id";
   private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
   private static final String TIMESTAMP_MS = "timestamp-ms";
@@ -48,6 +49,9 @@ public class SnapshotParser {
   static void toJson(Snapshot snapshot, JsonGenerator generator)
       throws IOException {
     generator.writeStartObject();
+    if (snapshot.sequenceNumber() > TableMetadata.INITIAL_SEQUENCE_NUMBER) {
+      generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber());
+    }
     generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
     if (snapshot.parentId() != null) {
       generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
@@ -103,7 +107,11 @@ public class SnapshotParser {
     Preconditions.checkArgument(node.isObject(),
         "Cannot parse table version from a non-object: %s", node);
 
-    long versionId = JsonUtil.getLong(SNAPSHOT_ID, node);
+    long sequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
+    if (node.has(SEQUENCE_NUMBER)) {
+      sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, node);
+    }
+    long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
     Long parentId = null;
     if (node.has(PARENT_SNAPSHOT_ID)) {
       parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
@@ -134,7 +142,7 @@ public class SnapshotParser {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
       return new BaseSnapshot(
-          io, versionId, parentId, timestamp, operation, summary,
+          io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary,
           io.newInputFile(manifestList));
 
     } else {
@@ -142,7 +150,7 @@ public class SnapshotParser {
       // loaded lazily, if it is needed
       List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
           location -> new GenericManifestFile(io.newInputFile(location), 0));
-      return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests);
+      return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0d3239f..fb9509e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -79,7 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
   private volatile Long snapshotId = null;
-  private TableMetadata base = null;
+  private TableMetadata base;
   private boolean stageOnly = false;
   private Consumer<String> deleteFunc = defaultDelete;
 
@@ -143,14 +143,15 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
     this.base = refresh();
     Long parentSnapshotId = base.currentSnapshot() != null ?
         base.currentSnapshot().snapshotId() : null;
+    long sequenceNumber = base.nextSequenceNumber();
 
     List<ManifestFile> manifests = apply(base);
 
-    if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+    if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
       OutputFile manifestList = manifestListPath();
 
       try (ManifestListWriter writer = ManifestListWriter.write(
-          ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {
+          ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {
 
         // keep track of the manifest lists created
         manifestLists.add(manifestList.location());
@@ -170,7 +171,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
       }
 
       return new BaseSnapshot(ops.io(),
-          snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
+          sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
           ops.io().newInputFile(manifestList.location()));
 
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 4d12cf5..f3bd7b2 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.util.PropertyUtil;
  * Metadata for a table.
  */
 public class TableMetadata {
+  static final long INITIAL_SEQUENCE_NUMBER = 0;
   static final int DEFAULT_TABLE_FORMAT_VERSION = 1;
   static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
   static final int INITIAL_SPEC_ID = 0;
@@ -69,7 +70,7 @@ public class TableMetadata {
     PartitionSpec freshSpec = specBuilder.build();
 
     return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location,
-        System.currentTimeMillis(),
+        INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
         lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
         ImmutableList.of(), ImmutableList.of());
@@ -168,6 +169,7 @@ public class TableMetadata {
   private final int formatVersion;
   private final String uuid;
   private final String location;
+  private final long lastSequenceNumber;
   private final long lastUpdatedMillis;
   private final int lastColumnId;
   private final Schema schema;
@@ -185,6 +187,7 @@ public class TableMetadata {
                 int formatVersion,
                 String uuid,
                 String location,
+                long lastSequenceNumber,
                 long lastUpdatedMillis,
                 int lastColumnId,
                 Schema schema,
@@ -197,14 +200,16 @@ public class TableMetadata {
                 List<MetadataLogEntry> previousFiles) {
     Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
         "Unsupported format version: v%s", formatVersion);
-    if (formatVersion > 1) {
-      Preconditions.checkArgument(uuid != null, "UUID is required in format v%s", formatVersion);
-    }
+    Preconditions.checkArgument(formatVersion == 1 || uuid != null,
+        "UUID is required in format v%s", formatVersion);
+    Preconditions.checkArgument(formatVersion > 1 || lastSequenceNumber == 0,
+        "Sequence number must be 0 in v1: %s", lastSequenceNumber);
 
     this.formatVersion = formatVersion;
     this.file = file;
     this.uuid = uuid;
     this.location = location;
+    this.lastSequenceNumber = lastSequenceNumber;
     this.lastUpdatedMillis = lastUpdatedMillis;
     this.lastColumnId = lastColumnId;
     this.schema = schema;
@@ -216,7 +221,7 @@ public class TableMetadata {
     this.snapshotLog = snapshotLog;
     this.previousFiles = previousFiles;
 
-    this.snapshotsById = indexSnapshots(snapshots);
+    this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
     this.specsById = indexSpecs(specs);
 
     HistoryEntry last = null;
@@ -256,6 +261,14 @@ public class TableMetadata {
     return uuid;
   }
 
+  public long lastSequenceNumber() {
+    return lastSequenceNumber;
+  }
+
+  public long nextSequenceNumber() {
+    return formatVersion > 1 ? lastSequenceNumber + 1 : INITIAL_SEQUENCE_NUMBER;
+  }
+
   public long lastUpdatedMillis() {
     return lastUpdatedMillis;
   }
@@ -337,7 +350,7 @@ public class TableMetadata {
       return this;
     } else {
       return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
-          lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+          lastSequenceNumber, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
           currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
     }
   }
@@ -348,8 +361,8 @@ public class TableMetadata {
     List<PartitionSpec> updatedSpecs = Lists.transform(specs,
         spec -> updateSpecSchema(newSchema, spec));
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties,
-        currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+        lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs,
+        properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
   public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
@@ -377,18 +390,23 @@ public class TableMetadata {
     }
 
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
         builder.build(), properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
   public TableMetadata addStagedSnapshot(Snapshot snapshot) {
+    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+        "Cannot add snapshot with sequence number %s older than last sequence number %s",
+        snapshot.sequenceNumber(), lastSequenceNumber);
+
     List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
         .addAll(snapshots)
         .add(snapshot)
         .build();
+
     return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
@@ -398,6 +416,10 @@ public class TableMetadata {
       return setCurrentSnapshotTo(snapshot);
     }
 
+    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+        "Cannot add snapshot with sequence number %s older than last sequence number %s",
+        snapshot.sequenceNumber(), lastSequenceNumber);
+
     List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
         .addAll(snapshots)
         .add(snapshot)
@@ -406,8 +428,9 @@ public class TableMetadata {
         .addAll(snapshotLog)
         .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
         .build();
+
     return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
@@ -438,7 +461,7 @@ public class TableMetadata {
     }
 
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog),
         addPreviousFile(file, lastUpdatedMillis));
   }
@@ -446,6 +469,9 @@ public class TableMetadata {
   private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
     ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
         "Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
+    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
+        "Last sequence number %s is less than existing snapshot sequence number %s",
+        lastSequenceNumber, snapshot.sequenceNumber());
 
     if (currentSnapshotId == snapshot.snapshotId()) {
       // change is a noop
@@ -459,14 +485,14 @@ public class TableMetadata {
         .build();
 
     return new TableMetadata(null, formatVersion, uuid, location,
-        nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
         snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
   public TableMetadata replaceProperties(Map<String, String> newProperties) {
     ValidationException.check(newProperties != null, "Cannot set properties to null");
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
   }
 
@@ -482,8 +508,9 @@ public class TableMetadata {
     ValidationException.check(currentSnapshotId < 0 || // not set
             Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
         "Cannot set invalid snapshot log: latest entry is not the current snapshot");
+
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
@@ -522,14 +549,14 @@ public class TableMetadata {
     newProperties.putAll(updatedProperties);
 
     return new TableMetadata(null, formatVersion, uuid, location,
-        System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
+        lastSequenceNumber, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
         specId, builder.build(), ImmutableMap.copyOf(newProperties),
         -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
   }
 
   public TableMetadata updateLocation(String newLocation) {
     return new TableMetadata(null, formatVersion, uuid, newLocation,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
@@ -545,7 +572,7 @@ public class TableMetadata {
     }
 
     return new TableMetadata(null, newFormatVersion, uuid, location,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
         currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
   }
 
@@ -562,7 +589,7 @@ public class TableMetadata {
     int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
             TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
 
-    List<MetadataLogEntry> newMetadataLog = null;
+    List<MetadataLogEntry> newMetadataLog;
     if (previousFiles.size() >= maxSize) {
       int removeIndex = previousFiles.size() - maxSize + 1;
       newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
@@ -602,10 +629,13 @@ public class TableMetadata {
     return specBuilder.build();
   }
 
-  private static Map<Long, Snapshot> indexSnapshots(List<Snapshot> snapshots) {
+  private static Map<Long, Snapshot> indexAndValidateSnapshots(List<Snapshot> snapshots, long lastSequenceNumber) {
     ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
-    for (Snapshot version : snapshots) {
-      builder.put(version.snapshotId(), version);
+    for (Snapshot snap : snapshots) {
+      ValidationException.check(snap.sequenceNumber() <= lastSequenceNumber,
+          "Invalid snapshot with sequence number %s greater than last sequence number %s",
+          snap.sequenceNumber(), lastSequenceNumber);
+      builder.put(snap.snapshotId(), snap);
     }
     return builder.build();
   }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 6fca73b..f746860 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -86,6 +86,7 @@ public class TableMetadataParser {
   static final String FORMAT_VERSION = "format-version";
   static final String TABLE_UUID = "table-uuid";
   static final String LOCATION = "location";
+  static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
   static final String LAST_UPDATED_MILLIS = "last-updated-ms";
   static final String LAST_COLUMN_ID = "last-column-id";
   static final String SCHEMA = "schema";
@@ -154,6 +155,9 @@ public class TableMetadataParser {
     generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
     generator.writeStringField(TABLE_UUID, metadata.uuid());
     generator.writeStringField(LOCATION, metadata.location());
+    if (metadata.formatVersion() > 1) {
+      generator.writeNumberField(LAST_SEQUENCE_NUMBER, metadata.lastSequenceNumber());
+    }
     generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
     generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
 
@@ -233,6 +237,12 @@ public class TableMetadataParser {
 
     String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
     String location = JsonUtil.getString(LOCATION, node);
+    long lastSequenceNumber;
+    if (formatVersion > 1) {
+      lastSequenceNumber = JsonUtil.getLong(LAST_SEQUENCE_NUMBER, node);
+    } else {
+      lastSequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
+    }
     int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
     Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
 
@@ -298,7 +308,7 @@ public class TableMetadataParser {
     }
 
     return new TableMetadata(file, formatVersion, uuid, location,
-        lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
+        lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
         currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
         ImmutableList.copyOf(metadataEntries.iterator()));
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 9d46584..e70351f 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -63,7 +63,7 @@ public class JsonUtil {
   }
 
   public static long getLong(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing int %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing long %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber(),
         "Cannot parse %s from non-numeric value: %s", property, pNode);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 5e55d66..67c1c04 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -67,6 +67,8 @@ public class TestSnapshotJson {
     String json = SnapshotParser.toJson(expected);
     Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
 
+    Assert.assertEquals("Sequence number should default to 0 for v1",
+        0, snapshot.sequenceNumber());
     Assert.assertEquals("Snapshot ID should match",
         expected.snapshotId(), snapshot.snapshotId());
     Assert.assertEquals("Timestamp should match",
@@ -100,7 +102,7 @@ public class TestSnapshotJson {
     }
 
     Snapshot expected = new BaseSnapshot(
-        ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
+        ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
     Snapshot inMemory = new BaseSnapshot(
         ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests);
 
@@ -110,6 +112,8 @@ public class TestSnapshotJson {
     String json = SnapshotParser.toJson(expected);
     Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
 
+    Assert.assertEquals("Sequence number should default to 0",
+        expected.sequenceNumber(), snapshot.sequenceNumber());
     Assert.assertEquals("Snapshot ID should match",
         expected.snapshotId(), snapshot.snapshotId());
     Assert.assertEquals("Timestamp should match",
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index a77530b..52e7f68 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -66,6 +66,7 @@ public class TestTableMetadata {
       Types.NestedField.required(3, "z", Types.LongType.get())
   );
 
+  private static final long SEQ_NO = 34;
   private static final int LAST_ASSIGNED_COLUMN_ID = 3;
 
   private static final PartitionSpec SPEC_5 = PartitionSpec.builderFor(TEST_SCHEMA).withSpecId(5).build();
@@ -91,8 +92,8 @@ public class TestTableMetadata {
         .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
         .build();
 
-    TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
-        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+    TableMetadata expected = new TableMetadata(null, 2, UUID.randomUUID().toString(), TEST_LOCATION,
+        SEQ_NO, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
 
@@ -106,6 +107,8 @@ public class TestTableMetadata {
         expected.uuid(), metadata.uuid());
     Assert.assertEquals("Table location should match",
         expected.location(), metadata.location());
+    Assert.assertEquals("Last sequence number should match",
+        expected.lastSequenceNumber(), metadata.lastSequenceNumber());
     Assert.assertEquals("Last column ID should match",
         expected.lastColumnId(), metadata.lastColumnId());
     Assert.assertEquals("Schema should match",
@@ -147,7 +150,7 @@ public class TestTableMetadata {
     List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
 
     TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
-        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+        0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.of());
 
@@ -184,7 +187,7 @@ public class TestTableMetadata {
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
-        System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
+        0, System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
 
@@ -197,6 +200,8 @@ public class TestTableMetadata {
     Assert.assertNull("Table UUID should not be assigned", metadata.uuid());
     Assert.assertEquals("Table location should match",
         expected.location(), metadata.location());
+    Assert.assertEquals("Last sequence number should default to 0",
+        expected.lastSequenceNumber(), metadata.lastSequenceNumber());
     Assert.assertEquals("Last column ID should match",
         expected.lastColumnId(), metadata.lastColumnId());
     Assert.assertEquals("Schema should match",
@@ -292,7 +297,7 @@ public class TestTableMetadata {
         "/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json"));
 
     TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
-        System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+        0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -327,7 +332,7 @@ public class TestTableMetadata {
         "/tmp/000003-" + UUID.randomUUID().toString() + ".metadata.json");
 
     TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
-        TEST_LOCATION, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+        TEST_LOCATION, 0, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -372,7 +377,7 @@ public class TestTableMetadata {
         "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
 
     TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
-        TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
+        TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
         ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -422,7 +427,7 @@ public class TestTableMetadata {
         "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
 
     TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
-        TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
+        TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
         ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
         ImmutableList.copyOf(previousMetadataLog));
@@ -447,9 +452,9 @@ public class TestTableMetadata {
   public void testV2UUIDValidation() {
     AssertHelpers.assertThrows("Should reject v2 metadata without a UUID",
         IllegalArgumentException.class, "UUID is required in format v2",
-        () -> new TableMetadata(null, 2, null, TEST_LOCATION, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
-            TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, ImmutableList.of(),
-            ImmutableList.of(), ImmutableList.of())
+        () -> new TableMetadata(null, 2, null, TEST_LOCATION, SEQ_NO, System.currentTimeMillis(),
+            LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
+            ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
     );
   }
 
@@ -458,9 +463,9 @@ public class TestTableMetadata {
     int unsupportedVersion = TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1;
     AssertHelpers.assertThrows("Should reject unsupported metadata",
         IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion,
-        () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, System.currentTimeMillis(),
-            LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
-            ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+        () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, SEQ_NO,
+            System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5),
+            ImmutableMap.of(), -1L, ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
     );
   }
 
@@ -493,6 +498,9 @@ public class TestTableMetadata {
       generator.writeStringField(TABLE_UUID, metadata.uuid());
       generator.writeStringField(LOCATION, metadata.location());
       generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+      if (version > 1) {
+        generator.writeNumberField(TableMetadataParser.LAST_SEQUENCE_NUMBER, metadata.lastSequenceNumber());
+      }
       generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
 
       generator.writeFieldName(SCHEMA);
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 332e247..f05f2ee 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -260,6 +260,7 @@ A snapshot consists of the following fields:
 
 *   **`snapshot-id`** -- A unique long ID.
 *   **`parent-snapshot-id`** -- (Optional) The snapshot ID of the snapshot’s parent. This field is not present for snapshots that have no parent snapshot, such as snapshots created before this field was added or the first snapshot of a table.
+*   **`sequence-number`** -- A monotonically increasing long that tracks the order of snapshots in a table. (**v2 only**)
 *   **`timestamp-ms`** -- A timestamp when the snapshot was created. This is used when garbage collecting snapshots.
 *   **`manifests`** -- A list of manifest file locations. The data files in a snapshot are the union of all data files listed in these manifests. (Deprecated in favor of `manifest-list`)
 *   **`manifest-list`** -- (Optional) The location of a manifest list file for this snapshot, which contains a list of manifest files with additional metadata. If present, the manifests field must be omitted.
@@ -369,21 +370,22 @@ When two commits happen at the same time and are based on the same version, only
 
 Table metadata consists of the following fields:
 
-| Field | Description |
-| ----- | ----------- |
-| **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. |
-| **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. |
-| **`location`**| The table’s base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. |
-| **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. |
-| **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. |
-| **`schema`**| The table’s current schema. |
-| **`partition-spec`**| The table’s current partition spec, stored as only fields. Note that this is used by writers to partition data, but is not used when reading because reads use the specs stored in manifest files. (**Deprecated**: use `partition-specs` and `default-spec-id`instead ) |
-| **`partition-specs`**| A list of partition specs, stored as full partition spec objects. |
-| **`default-spec-id`**| ID of the “current” spec that writers should use by default. |
-| **`properties`**| A string to string map of table properties. This is used to control settings that affect reading and writing and is not intended to be used for arbitrary metadata. For example, `commit.retry.num-retries` is used to control the number of commit retries. |
-| **`current-snapshot-id`**| `long` ID of the current table snapshot. |
-| **`snapshots`**| A list of valid snapshots. Valid snapshots are snapshots for which all data files exist in the file system. A data file must not be deleted from the file system until the last snapshot in which it was listed is garbage collected. |
-| **`snapshot-log`**| A list (optional) of timestamp and snapshot ID pairs that encodes changes to the current snapshot for the table. Each time the current-snapshot-id is changed, a new entry should be added with the last-updated-ms and the new current-snapshot-id. When snapshots are expired from the list of valid snapshots, all entries before a snapshot that has expired should be removed. |
+| Format v1  | Format v2  | Field | Description |
+| ---------- | ---------- | ----- | ----------- |
+| _required_ | _required_ | **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. |
+| _optional_ | _required_ | **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. |
+| _required_ | _required_ | **`location`**| The table's base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. |
+| _omitted_  | _required_ | **`sequence-number`**| The table's highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table. |
+| _required_ | _required_ | **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. |
+| _required_ | _required_ | **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. |
+| _required_ | _required_ | **`schema`**| The table’s current schema. |
+| _required_ | _omitted_  | **`partition-spec`**| The table’s current partition spec, stored as only fields. Note that this is used by writers to partition data, but is not used when reading because reads use the specs stored in manifest files. (**Deprecated**: use `partition-specs` and `default-spec-id`instead ) |
+| _optional_ | _required_ | **`partition-specs`**| A list of partition specs, stored as full partition spec objects. |
+| _optional_ | _required_ | **`default-spec-id`**| ID of the “current” spec that writers should use by default. |
+| _optional_ | _optional_ | **`properties`**| A string to string map of table properties. This is used to control settings that affect reading and writing and is not intended to be used for arbitrary metadata. For example, `commit.retry.num-retries` is used to control the number of commit retries. |
+| _optional_ | _optional_ | **`current-snapshot-id`**| `long` ID of the current table snapshot. |
+| _optional_ | _optional_ | **`snapshots`**| A list of valid snapshots. Valid snapshots are snapshots for which all data files exist in the file system. A data file must not be deleted from the file system until the last snapshot in which it was listed is garbage collected. |
+| _optional_ | _optional_ | **`snapshot-log`**| A list (optional) of timestamp and snapshot ID pairs that encodes changes to the current snapshot for the table. Each time the current-snapshot-id is changed, a new entry should be added with the last-updated-ms and the new current-snapshot-id. When snapshots are expired from the list of valid snapshots, all entries before a snapshot that has expired should be removed. |
 
 For serialization details, see Appendix C.
 
@@ -681,13 +683,22 @@ This serialization scheme is for storing single values as individual binary valu
 
 ### Version 2
 
-Writing metadata:
-* Table metadata field `sequence-number` is required.
-* Table metadata field `table-uuid` is required.
-* Table metadata field `partition-specs` is required.
-* Table metadata field `default-spec-id` is required.
+Writing v1 metadata:
+* Table metadata field `last-sequence-number` should not be written.
+* Snapshot field `sequence-number` should not be written.
+
+Reading v1 metadata:
+* Table metadata field `last-sequence-number` must default to 0.
+* Snapshot field `sequence-number` must default to 0.
+
+Writing v2 metadata:
+* Table metadata added required field `last-sequence-number`.
+* Table metadata now requires field `table-uuid`.
+* Table metadata now requires field `partition-specs`.
+* Table metadata now requires field `default-spec-id`.
 * Table metadata field `partition-spec` is no longer required and may be omitted.
-* Snapshot field `manifest-list` is required.
-* Snapshot field `manifests` is not allowed.
+* Snapshot added required field field `sequence-number`.
+* Snapshot now requires field `manifest-list`.
+* Snapshot field `manifests` is no longer allowed.
 
 Note that these requirements apply when writing data to a v2 table. Tables that are upgraded from v1 may contain metadata that does not follow these requirements. Implementations should remain backward-compatible with v1 metadata requirements.
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index 5c6640e..23d6dcb 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -214,6 +214,11 @@ public class TestForwardCompatibility {
     }
 
     @Override
+    public long sequenceNumber() {
+      return 0;
+    }
+
+    @Override
     public long snapshotId() {
       return 1;
     }