You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/05 19:53:10 UTC

[incubator-iceberg] branch master updated: Store multiple partition specs in table metadata. (#3)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new fd8a162  Store multiple partition specs in table metadata. (#3)
fd8a162 is described below

commit fd8a162e4d39aaeaca8da61a96ed62d4f391dfb9
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Dec 5 11:53:06 2018 -0800

    Store multiple partition specs in table metadata. (#3)
    
    The purpose of this change is to enable future partition spec changes
    and to assign IDs to specs that can be easily encoded in an Avro file
    that tracks a snapshot's manifests.
    
    This updates TableMetadata and the metadata parser to support multiple
    partition specs. This change is forward-compatible for older readers
    because the "partition-spec" field in table metadata is still set to the
    default spec.
    
    Multiple specs are now stored in an array in table metadata called
    "partition-specs". Each entry in the array is an object with two fields,
    a "spec-id" field with an integer ID value, and a "partition-spec"
    field with a partition spec value (an array of partition fields). This
    also adds "default-spec-id" that points to the spec that should be used
    when writing.
---
 .../java/com/netflix/iceberg/PartitionSpec.java    |  31 +++-
 .../java/com/netflix/iceberg/ManifestReader.java   |   7 +-
 .../java/com/netflix/iceberg/ManifestWriter.java   |   3 +-
 .../com/netflix/iceberg/PartitionSpecParser.java   |  94 ++++++++----
 .../java/com/netflix/iceberg/TableMetadata.java    | 164 ++++++++++++++++-----
 .../com/netflix/iceberg/TableMetadataParser.java   |  70 ++++++---
 .../java/com/netflix/iceberg/TestMergeAppend.java  |  17 +--
 .../com/netflix/iceberg/TestTableMetadataJson.java | 137 ++++++++++++++++-
 8 files changed, 422 insertions(+), 101 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
index da13f8c..b17e25b 100644
--- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
+++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
@@ -50,14 +50,16 @@ public class PartitionSpec implements Serializable {
   private final Schema schema;
 
   // this is ordered so that DataFile has a consistent schema
+  private final int specId;
   private final PartitionField[] fields;
   private transient Map<Integer, PartitionField> fieldsBySourceId = null;
   private transient Map<String, PartitionField> fieldsByName = null;
   private transient Class<?>[] javaClasses = null;
   private transient List<PartitionField> fieldList = null;
 
-  private PartitionSpec(Schema schema, List<PartitionField> fields) {
+  private PartitionSpec(Schema schema, int specId, List<PartitionField> fields) {
     this.schema = schema;
+    this.specId = specId;
     this.fields = new PartitionField[fields.size()];
     for (int i = 0; i < this.fields.length; i += 1) {
       this.fields[i] = fields.get(i);
@@ -72,6 +74,13 @@ public class PartitionSpec implements Serializable {
   }
 
   /**
+   * @return the ID of this spec
+   */
+  public int specId() {
+    return specId;
+  }
+
+  /**
    * @return the list of {@link PartitionField partition fields} for this spec.
    */
   public List<PartitionField> fields() {
@@ -146,6 +155,13 @@ public class PartitionSpec implements Serializable {
     return sb.toString();
   }
 
+  /**
+   * Returns true if this spec is equivalent to the other, with field names ignored. That is, if
+   * both specs have the same number of fields, field order, source columns, and transforms.
+   *
+   * @param other another PartitionSpec
+   * @return true if the specs have the same fields, source columns, and transforms.
+   */
   public boolean compatibleWith(PartitionSpec other) {
     if (equals(other)) {
       return true;
@@ -177,6 +193,9 @@ public class PartitionSpec implements Serializable {
     }
 
     PartitionSpec that = (PartitionSpec) other;
+    if (this.specId != that.specId) {
+      return false;
+    }
     return Arrays.equals(fields, that.fields);
   }
 
@@ -250,7 +269,7 @@ public class PartitionSpec implements Serializable {
   }
 
   private static final PartitionSpec UNPARTITIONED_SPEC =
-      new PartitionSpec(new Schema(), ImmutableList.of());
+      new PartitionSpec(new Schema(), 0, ImmutableList.of());
 
   /**
    * Returns a spec for unpartitioned tables.
@@ -280,6 +299,7 @@ public class PartitionSpec implements Serializable {
     private final Schema schema;
     private final List<PartitionField> fields = Lists.newArrayList();
     private final Set<String> partitionNames = Sets.newHashSet();
+    private int specId = 0;
 
     private Builder(Schema schema) {
       this.schema = schema;
@@ -293,6 +313,11 @@ public class PartitionSpec implements Serializable {
       partitionNames.add(name);
     }
 
+    public Builder withSpecId(int specId) {
+      this.specId = specId;
+      return this;
+    }
+
     private Types.NestedField findSourceColumn(String sourceName) {
       Types.NestedField sourceColumn = schema.findField(sourceName);
       Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName);
@@ -371,7 +396,7 @@ public class PartitionSpec implements Serializable {
     }
 
     public PartitionSpec build() {
-      PartitionSpec spec = new PartitionSpec(schema, fields);
+      PartitionSpec spec = new PartitionSpec(schema, specId, fields);
       checkCompatibility(spec, schema);
       return spec;
     }
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
index fa6ffbf..065210e 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
@@ -99,7 +99,12 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
       throw new RuntimeIOException(e);
     }
     this.schema = SchemaParser.fromJson(metadata.get("schema"));
-    this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec"));
+    int specId = TableMetadata.INITIAL_SPEC_ID;
+    String specProperty = metadata.get("partition-spec-id");
+    if (specProperty != null) {
+      specId = Integer.parseInt(specProperty);
+    }
+    this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
     this.entries = null;
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index e787b68..28ba831 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -108,7 +108,8 @@ class ManifestWriter implements FileAppender<DataFile> {
               .schema(manifestSchema)
               .named("manifest_entry")
               .meta("schema", SchemaParser.toJson(spec.schema()))
-              .meta("partition-spec", PartitionSpecParser.toJson(spec))
+              .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+              .meta("partition-spec-id", String.valueOf(spec.specId()))
               .build();
         default:
           throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
index fe20f65..4df7c55 100644
--- a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
@@ -37,20 +37,18 @@ public class PartitionSpecParser {
   private PartitionSpecParser() {
   }
 
+  private static final String SPEC_ID = "spec-id";
+  private static final String FIELDS = "fields";
   private static final String SOURCE_ID = "source-id";
   private static final String TRANSFORM = "transform";
   private static final String NAME = "name";
 
   public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException {
-    generator.writeStartArray();
-    for (PartitionField field : spec.fields()) {
-      generator.writeStartObject();
-      generator.writeStringField(NAME, field.name());
-      generator.writeStringField(TRANSFORM, field.transform().toString());
-      generator.writeNumberField(SOURCE_ID, field.sourceId());
-      generator.writeEndObject();
-    }
-    generator.writeEndArray();
+    generator.writeStartObject();
+    generator.writeNumberField(SPEC_ID, spec.specId());
+    generator.writeFieldName(FIELDS);
+    toJsonFields(spec, generator);
+    generator.writeEndObject();
   }
 
   public static String toJson(PartitionSpec spec) {
@@ -74,23 +72,10 @@ public class PartitionSpecParser {
   }
 
   public static PartitionSpec fromJson(Schema schema, JsonNode json) {
-    Preconditions.checkArgument(json.isArray(),
-        "Cannot parse partition spec, not an array: %s", json);
-
-    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
-    Iterator<JsonNode> elements = json.elements();
-    while (elements.hasNext()) {
-      JsonNode element = elements.next();
-      Preconditions.checkArgument(element.isObject(),
-          "Cannot parse partition field, not an object: %s", element);
-
-      String name = JsonUtil.getString(NAME, element);
-      String transform = JsonUtil.getString(TRANSFORM, element);
-      int sourceId = JsonUtil.getInt(SOURCE_ID, element);
-
-      builder.add(sourceId, name, transform);
-    }
-
+    Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json);
+    int specId = JsonUtil.getInt(SPEC_ID, json);
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+    buildFromJsonFields(builder, json.get(FIELDS));
     return builder.build();
   }
 
@@ -113,4 +98,61 @@ public class PartitionSpecParser {
       }
     }
   }
+
+  static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException {
+    generator.writeStartArray();
+    for (PartitionField field : spec.fields()) {
+      generator.writeStartObject();
+      generator.writeStringField(NAME, field.name());
+      generator.writeStringField(TRANSFORM, field.transform().toString());
+      generator.writeNumberField(SOURCE_ID, field.sourceId());
+      generator.writeEndObject();
+    }
+    generator.writeEndArray();
+  }
+
+  static String toJsonFields(PartitionSpec spec) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+      toJsonFields(spec, generator);
+      generator.flush();
+      return writer.toString();
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+    buildFromJsonFields(builder, json);
+    return builder.build();
+  }
+
+  static PartitionSpec fromJsonFields(Schema schema, int specId, String json) {
+    try {
+      return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json);
+    }
+  }
+
+  private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) {
+    Preconditions.checkArgument(json.isArray(),
+        "Cannot parse partition spec fields, not an array: %s", json);
+
+    Iterator<JsonNode> elements = json.elements();
+    while (elements.hasNext()) {
+      JsonNode element = elements.next();
+      Preconditions.checkArgument(element.isObject(),
+          "Cannot parse partition field, not an object: %s", element);
+
+      String name = JsonUtil.getString(NAME, element);
+      String transform = JsonUtil.getString(TRANSFORM, element);
+      int sourceId = JsonUtil.getInt(SOURCE_ID, element);
+
+      builder.add(sourceId, name, transform);
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 38fda2e..05c3392 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.ValidationException;
 import com.netflix.iceberg.io.InputFile;
@@ -40,6 +41,7 @@ import java.util.function.Predicate;
  */
 public class TableMetadata {
   static final int TABLE_FORMAT_VERSION = 1;
+  static final int INITIAL_SPEC_ID = 0;
 
   public static TableMetadata newTableMetadata(TableOperations ops,
                                                Schema schema,
@@ -58,7 +60,8 @@ public class TableMetadata {
     Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
 
     // rebuild the partition spec using the new column ids
-    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema)
+        .withSpecId(INITIAL_SPEC_ID);
     for (PartitionField field : spec.fields()) {
       // look up the name of the source field in the old schema to get the new schema's id
       String sourceName = schema.findColumnName(field.sourceId());
@@ -71,7 +74,7 @@ public class TableMetadata {
 
     return new TableMetadata(ops, null, location,
         System.currentTimeMillis(),
-        lastColumnId.get(), freshSchema, freshSpec,
+        lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
   }
 
@@ -126,11 +129,13 @@ public class TableMetadata {
   private final long lastUpdatedMillis;
   private final int lastColumnId;
   private final Schema schema;
-  private final PartitionSpec spec;
+  private final int defaultSpecId;
+  private final List<PartitionSpec> specs;
   private final Map<String, String> properties;
   private final long currentSnapshotId;
   private final List<Snapshot> snapshots;
   private final Map<Long, Snapshot> snapshotsById;
+  private final Map<Integer, PartitionSpec> specsById;
   private final List<SnapshotLogEntry> snapshotLog;
 
   TableMetadata(TableOperations ops,
@@ -139,7 +144,8 @@ public class TableMetadata {
                 long lastUpdatedMillis,
                 int lastColumnId,
                 Schema schema,
-                PartitionSpec spec,
+                int defaultSpecId,
+                List<PartitionSpec> specs,
                 Map<String, String> properties,
                 long currentSnapshotId,
                 List<Snapshot> snapshots,
@@ -150,17 +156,15 @@ public class TableMetadata {
     this.lastUpdatedMillis = lastUpdatedMillis;
     this.lastColumnId = lastColumnId;
     this.schema = schema;
-    this.spec = spec;
+    this.specs = specs;
+    this.defaultSpecId = defaultSpecId;
     this.properties = properties;
     this.currentSnapshotId = currentSnapshotId;
     this.snapshots = snapshots;
     this.snapshotLog = snapshotLog;
 
-    ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
-    for (Snapshot version : snapshots) {
-      builder.put(version.snapshotId(), version);
-    }
-    this.snapshotsById = builder.build();
+    this.snapshotsById = indexSnapshots(snapshots);
+    this.specsById = indexSpecs(specs);
 
     SnapshotLogEntry last = null;
     for (SnapshotLogEntry logEntry : snapshotLog) {
@@ -194,7 +198,19 @@ public class TableMetadata {
   }
 
   public PartitionSpec spec() {
-    return spec;
+    return specsById.get(defaultSpecId);
+  }
+
+  public int defaultSpecId() {
+    return defaultSpecId;
+  }
+
+  public PartitionSpec spec(int id) {
+    return specsById.get(id);
+  }
+
+  public List<PartitionSpec> specs() {
+    return specs;
   }
 
   public String location() {
@@ -239,15 +255,45 @@ public class TableMetadata {
 
   public TableMetadata updateTableLocation(String newLocation) {
     return new TableMetadata(ops, null, newLocation,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots, snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata updateSchema(Schema schema, int lastColumnId) {
-    PartitionSpec.checkCompatibility(spec, schema);
+    PartitionSpec.checkCompatibility(spec(), schema);
+    return new TableMetadata(ops, null, location,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
+  }
+
+  public TableMetadata updatePartitionSpec(PartitionSpec partitionSpec) {
+    PartitionSpec.checkCompatibility(partitionSpec, schema);
+
+    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+    int newDefaultSpecId = INITIAL_SPEC_ID;
+    for (PartitionSpec spec : specs) {
+      if (partitionSpec.compatibleWith(spec)) {
+        newDefaultSpecId = spec.specId();
+        break;
+      } else if (newDefaultSpecId <= spec.specId()) {
+        newDefaultSpecId = spec.specId() + 1;
+      }
+    }
+
+    Preconditions.checkArgument(defaultSpecId != newDefaultSpecId,
+        "Cannot set default partition spec to the current default");
+
+    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+        .addAll(specs);
+    if (!specsById.containsKey(newDefaultSpecId)) {
+      // get a fresh spec to ensure the spec ID is set to the new default
+      builder.add(freshSpec(newDefaultSpecId, schema, partitionSpec));
+    }
+
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots,snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
+        builder.build(), properties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
@@ -260,8 +306,8 @@ public class TableMetadata {
         .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
         .build();
     return new TableMetadata(ops, null, location,
-        snapshot.timestampMillis(), lastColumnId, schema, spec, properties, snapshot.snapshotId(),
-        newSnapshots, newSnapshotLog);
+        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.snapshotId(), newSnapshots, newSnapshotLog);
   }
 
   public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
@@ -291,8 +337,8 @@ public class TableMetadata {
     }
 
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        filtered, ImmutableList.copyOf(newSnapshotLog));
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog));
   }
 
   public TableMetadata rollbackTo(Snapshot snapshot) {
@@ -306,15 +352,15 @@ public class TableMetadata {
         .build();
 
     return new TableMetadata(ops, null, location,
-        nowMillis, lastColumnId, schema, spec, properties, snapshot.snapshotId(), snapshots,
-        newSnapshotLog);
+        nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.snapshotId(), snapshots, newSnapshotLog);
   }
 
   public TableMetadata replaceProperties(Map<String, String> newProperties) {
     ValidationException.check(newProperties != null, "Cannot set properties to null");
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, newProperties, currentSnapshotId,
-        snapshots, snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
@@ -330,8 +376,8 @@ public class TableMetadata {
             Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
         "Cannot set invalid snapshot log: latest entry is not the current snapshot");
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots, newSnapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, newSnapshotLog);
   }
 
   public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec,
@@ -339,24 +385,70 @@ public class TableMetadata {
     AtomicInteger lastColumnId = new AtomicInteger(0);
     Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
 
+    int nextSpecId = TableMetadata.INITIAL_SPEC_ID;
+    for (Integer specId : specsById.keySet()) {
+      if (nextSpecId <= specId) {
+        nextSpecId = specId + 1;
+      }
+    }
+
     // rebuild the partition spec using the new column ids
-    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+    PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, partitionSpec);
+
+    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+    int specId = nextSpecId;
+    for (PartitionSpec spec : specs) {
+      if (freshSpec.compatibleWith(spec)) {
+        specId = spec.specId();
+        break;
+      }
+    }
+
+    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+        .addAll(specs);
+    if (!specsById.containsKey(specId)) {
+      builder.add(freshSpec);
+    }
+
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.putAll(this.properties);
+    newProperties.putAll(properties);
+
+    return new TableMetadata(ops, null, location,
+        System.currentTimeMillis(), lastColumnId.get(), freshSchema,
+        specId, builder.build(), ImmutableMap.copyOf(newProperties),
+        -1, snapshots, ImmutableList.of());
+  }
+
+  private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
+        .withSpecId(specId);
+
     for (PartitionField field : partitionSpec.fields()) {
       // look up the name of the source field in the old schema to get the new schema's id
-      String sourceName = schema.findColumnName(field.sourceId());
+      String sourceName = partitionSpec.schema().findColumnName(field.sourceId());
       specBuilder.add(
-          freshSchema.findField(sourceName).fieldId(),
+          schema.findField(sourceName).fieldId(),
           field.name(),
           field.transform().toString());
     }
-    PartitionSpec freshSpec = specBuilder.build();
 
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    builder.putAll(this.properties);
-    builder.putAll(properties);
+    return specBuilder.build();
+  }
 
-    return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId.get(), freshSchema, freshSpec, properties, -1,
-        snapshots, ImmutableList.of());
+  private static Map<Long, Snapshot> indexSnapshots(List<Snapshot> snapshots) {
+    ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
+    for (Snapshot version : snapshots) {
+      builder.put(version.snapshotId(), version);
+    }
+    return builder.build();
+  }
+
+  private static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
+    ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
+    for (PartitionSpec spec : specs) {
+      builder.put(spec.specId(), spec);
+    }
+    return builder.build();
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
index f0508b3..0961d8c 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
@@ -46,18 +46,21 @@ import java.util.SortedSet;
 
 public class TableMetadataParser {
 
-  private static final String FORMAT_VERSION = "format-version";
-  private static final String LOCATION = "location";
-  private static final String LAST_UPDATED_MILLIS = "last-updated-ms";
-  private static final String LAST_COLUMN_ID = "last-column-id";
-  private static final String SCHEMA = "schema";
-  private static final String PARTITION_SPEC = "partition-spec";
-  private static final String PROPERTIES = "properties";
-  private static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
-  private static final String SNAPSHOTS = "snapshots";
-  private static final String SNAPSHOT_ID = "snapshot-id";
-  private static final String TIMESTAMP_MS = "timestamp-ms";
-  private static final String SNAPSHOT_LOG = "snapshot-log";
+  // visible for testing
+  static final String FORMAT_VERSION = "format-version";
+  static final String LOCATION = "location";
+  static final String LAST_UPDATED_MILLIS = "last-updated-ms";
+  static final String LAST_COLUMN_ID = "last-column-id";
+  static final String SCHEMA = "schema";
+  static final String PARTITION_SPEC = "partition-spec";
+  static final String PARTITION_SPECS = "partition-specs";
+  static final String DEFAULT_SPEC_ID = "default-spec-id";
+  static final String PROPERTIES = "properties";
+  static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
+  static final String SNAPSHOTS = "snapshots";
+  static final String SNAPSHOT_ID = "snapshot-id";
+  static final String TIMESTAMP_MS = "timestamp-ms";
+  static final String SNAPSHOT_LOG = "snapshot-log";
 
   public static String toJson(TableMetadata metadata) {
     StringWriter writer = new StringWriter();
@@ -100,8 +103,17 @@ public class TableMetadataParser {
     generator.writeFieldName(SCHEMA);
     SchemaParser.toJson(metadata.schema(), generator);
 
+    // for older readers, continue writing the default spec as "partition-spec"
     generator.writeFieldName(PARTITION_SPEC);
-    PartitionSpecParser.toJson(metadata.spec(), generator);
+    PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+    // write the default spec ID and spec list
+    generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
+    generator.writeArrayFieldStart(PARTITION_SPECS);
+    for (PartitionSpec spec : metadata.specs()) {
+      PartitionSpecParser.toJson(spec, generator);
+    }
+    generator.writeEndArray();
 
     generator.writeObjectFieldStart(PROPERTIES);
     for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
@@ -150,7 +162,32 @@ public class TableMetadataParser {
     String location = JsonUtil.getString(LOCATION, node);
     int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
     Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
-    PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC));
+
+    JsonNode specArray = node.get(PARTITION_SPECS);
+    List<PartitionSpec> specs;
+    int defaultSpecId;
+    if (specArray != null) {
+      Preconditions.checkArgument(specArray.isArray(),
+          "Cannot parse partition specs from non-array: %s", specArray);
+      // default spec ID is required when the spec array is present
+      defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node);
+
+      // parse the spec array
+      ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
+      for (JsonNode spec : specArray) {
+        builder.add(PartitionSpecParser.fromJson(schema, spec));
+      }
+      specs = builder.build();
+
+    } else {
+      // partition spec is required for older readers, but is always set to the default if the spec
+      // array is set. it is only used to default the spec map is missing, indicating that the
+      // table metadata was written by an older writer.
+      defaultSpecId = TableMetadata.INITIAL_SPEC_ID;
+      specs = ImmutableList.of(PartitionSpecParser.fromJsonFields(
+          schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
+    }
+
     Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
     long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
     long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);
@@ -177,8 +214,7 @@ public class TableMetadataParser {
     }
 
     return new TableMetadata(ops, file, location,
-        lastUpdatedMillis, lastAssignedColumnId, schema, spec, properties, currentVersionId,
-        snapshots, ImmutableList.copyOf(entries.iterator()));
+        lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
+        currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()));
   }
-
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index dd7a3cf..a1e28bc 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.ManifestEntry.Status;
@@ -236,16 +235,14 @@ public class TestMergeAppend extends TableTestBase {
         1, base.currentSnapshot().manifests().size());
     String initialManifest = base.currentSnapshot().manifests().get(0);
 
-    PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+    // build the new spec using the table's schema, which uses fresh IDs
+    PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
         .bucket("data", 16)
         .bucket("id", 4)
         .build();
 
     // commit the new partition spec to the table manually
-    TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
-        System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
-        base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
-    table.ops().commit(base, updated);
+    table.ops().commit(base, base.updatePartitionSpec(newSpec));
 
     DataFile newFileC = DataFiles.builder(newSpec)
         .copy(FILE_C)
@@ -284,16 +281,14 @@ public class TestMergeAppend extends TableTestBase {
         2, base.currentSnapshot().manifests().size());
     String manifest = base.currentSnapshot().manifests().get(0);
 
-    PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+    // build the new spec using the table's schema, which uses fresh IDs
+    PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
         .bucket("data", 16)
         .bucket("id", 4)
         .build();
 
     // commit the new partition spec to the table manually
-    TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
-        System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
-        base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
-    table.ops().commit(base, updated);
+    table.ops().commit(base, base.updatePartitionSpec(newSpec));
 
     DataFile newFileC = DataFiles.builder(newSpec)
         .copy(FILE_C)
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 53df5db..21acdbd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -19,19 +19,34 @@
 
 package com.netflix.iceberg;
 
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.TableMetadata.SnapshotLogEntry;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.types.Types;
 import com.netflix.iceberg.util.JsonUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
+import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
+import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
+import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
+import static com.netflix.iceberg.TableMetadataParser.LAST_UPDATED_MILLIS;
+import static com.netflix.iceberg.TableMetadataParser.LOCATION;
+import static com.netflix.iceberg.TableMetadataParser.PARTITION_SPEC;
+import static com.netflix.iceberg.TableMetadataParser.PROPERTIES;
+import static com.netflix.iceberg.TableMetadataParser.SCHEMA;
+import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
+
 public class TestTableMetadataJson {
   @Test
   public void testJsonConversion() throws Exception {
@@ -41,7 +56,7 @@ public class TestTableMetadataJson {
         Types.NestedField.required(3, "z", Types.LongType.get())
     );
 
-    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
@@ -56,8 +71,9 @@ public class TestTableMetadataJson {
         .build();
 
     TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
-        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
+        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
 
     String asJson = TableMetadataParser.toJson(expected);
     TableMetadata metadata = TableMetadataParser.fromJson(null, null,
@@ -71,6 +87,10 @@ public class TestTableMetadataJson {
         expected.schema().asStruct(), metadata.schema().asStruct());
     Assert.assertEquals("Partition spec should match",
         expected.spec().toString(), metadata.spec().toString());
+    Assert.assertEquals("Default spec ID should match",
+        expected.defaultSpecId(), metadata.defaultSpecId());
+    Assert.assertEquals("PartitionSpec map should match",
+        expected.specs(), metadata.specs());
     Assert.assertEquals("Properties should match",
         expected.properties(), metadata.properties());
     Assert.assertEquals("Snapshot logs should match",
@@ -96,7 +116,7 @@ public class TestTableMetadataJson {
         Types.NestedField.required(3, "z", Types.LongType.get())
     );
 
-    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
@@ -108,8 +128,9 @@ public class TestTableMetadataJson {
     List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
 
     TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
-        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
+        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
 
     // add the entries after creating TableMetadata to avoid the sorted check
     reversedSnapshotLog.add(
@@ -129,4 +150,108 @@ public class TestTableMetadataJson {
     Assert.assertEquals("Snapshot logs should match",
         expectedSnapshotLog, metadata.snapshotLog());
   }
+
+  @Test
+  public void testBackwardCompatMissingPartitionSpecList() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(1, "x", Types.LongType.get()),
+        Types.NestedField.required(2, "y", Types.LongType.get()),
+        Types.NestedField.required(3, "z", Types.LongType.get())
+    );
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build();
+
+    long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
+    Snapshot previousSnapshot = new BaseSnapshot(
+        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+    long currentSnapshotId = System.currentTimeMillis();
+    Snapshot currentSnapshot = new BaseSnapshot(
+        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+
+    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+        System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
+
+    String asJson = toJsonWithoutSpecList(expected);
+    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+        JsonUtil.mapper().readValue(asJson, JsonNode.class));
+
+    Assert.assertEquals("Table location should match",
+        expected.location(), metadata.location());
+    Assert.assertEquals("Last column ID should match",
+        expected.lastColumnId(), metadata.lastColumnId());
+    Assert.assertEquals("Schema should match",
+        expected.schema().asStruct(), metadata.schema().asStruct());
+    Assert.assertEquals("Partition spec should be the default",
+        expected.spec().toString(), metadata.spec().toString());
+    Assert.assertEquals("Default spec ID should default to TableMetadata.INITIAL_SPEC_ID",
+        TableMetadata.INITIAL_SPEC_ID, metadata.defaultSpecId());
+    Assert.assertEquals("PartitionSpec should contain the spec",
+        1, metadata.specs().size());
+    Assert.assertTrue("PartitionSpec should contain the spec",
+        metadata.specs().get(0).compatibleWith(spec));
+    Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID",
+        TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId());
+    Assert.assertEquals("Properties should match",
+        expected.properties(), metadata.properties());
+    Assert.assertEquals("Snapshot logs should match",
+        expected.snapshotLog(), metadata.snapshotLog());
+    Assert.assertEquals("Current snapshot ID should match",
+        currentSnapshotId, metadata.currentSnapshot().snapshotId());
+    Assert.assertEquals("Parent snapshot ID should match",
+        (Long) previousSnapshotId, metadata.currentSnapshot().parentId());
+    Assert.assertEquals("Current snapshot files should match",
+        currentSnapshot.manifests(), metadata.currentSnapshot().manifests());
+    Assert.assertEquals("Previous snapshot ID should match",
+        previousSnapshotId, metadata.snapshot(previousSnapshotId).snapshotId());
+    Assert.assertEquals("Previous snapshot files should match",
+        previousSnapshot.manifests(),
+        metadata.snapshot(previousSnapshotId).manifests());
+  }
+
+  public static String toJsonWithoutSpecList(TableMetadata metadata) {
+    StringWriter writer = new StringWriter();
+    try {
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+      generator.writeStartObject(); // start table metadata object
+
+      generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+      generator.writeStringField(LOCATION, metadata.location());
+      generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+      generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
+
+      generator.writeFieldName(SCHEMA);
+      SchemaParser.toJson(metadata.schema(), generator);
+
+      // mimic an old writer by writing only partition-spec and not the default ID or spec list
+      generator.writeFieldName(PARTITION_SPEC);
+      PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+      generator.writeObjectFieldStart(PROPERTIES);
+      for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
+        generator.writeStringField(keyValue.getKey(), keyValue.getValue());
+      }
+      generator.writeEndObject();
+
+      generator.writeNumberField(CURRENT_SNAPSHOT_ID,
+          metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
+
+      generator.writeArrayFieldStart(SNAPSHOTS);
+      for (Snapshot snapshot : metadata.snapshots()) {
+        SnapshotParser.toJson(snapshot, generator);
+      }
+      generator.writeEndArray();
+
+      // skip the snapshot log
+
+      generator.writeEndObject(); // end table metadata object
+
+      generator.flush();
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
+    }
+    return writer.toString();
+  }
 }