You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/05 19:53:07 UTC

[GitHub] rdblue closed pull request #3: Store multiple partition specs in table metadata.

rdblue closed pull request #3: Store multiple partition specs in table metadata.
URL: https://github.com/apache/incubator-iceberg/pull/3
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
index da13f8c..b17e25b 100644
--- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
+++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
@@ -50,14 +50,16 @@
   private final Schema schema;
 
   // this is ordered so that DataFile has a consistent schema
+  private final int specId;
   private final PartitionField[] fields;
   private transient Map<Integer, PartitionField> fieldsBySourceId = null;
   private transient Map<String, PartitionField> fieldsByName = null;
   private transient Class<?>[] javaClasses = null;
   private transient List<PartitionField> fieldList = null;
 
-  private PartitionSpec(Schema schema, List<PartitionField> fields) {
+  private PartitionSpec(Schema schema, int specId, List<PartitionField> fields) {
     this.schema = schema;
+    this.specId = specId;
     this.fields = new PartitionField[fields.size()];
     for (int i = 0; i < this.fields.length; i += 1) {
       this.fields[i] = fields.get(i);
@@ -71,6 +73,13 @@ public Schema schema() {
     return schema;
   }
 
+  /**
+   * @return the ID of this spec
+   */
+  public int specId() {
+    return specId;
+  }
+
   /**
    * @return the list of {@link PartitionField partition fields} for this spec.
    */
@@ -146,6 +155,13 @@ public String partitionToPath(StructLike data) {
     return sb.toString();
   }
 
+  /**
+   * Returns true if this spec is equivalent to the other, with field names ignored. That is, if
+   * both specs have the same number of fields, field order, source columns, and transforms.
+   *
+   * @param other another PartitionSpec
+   * @return true if the specs have the same fields, source columns, and transforms.
+   */
   public boolean compatibleWith(PartitionSpec other) {
     if (equals(other)) {
       return true;
@@ -177,6 +193,9 @@ public boolean equals(Object other) {
     }
 
     PartitionSpec that = (PartitionSpec) other;
+    if (this.specId != that.specId) {
+      return false;
+    }
     return Arrays.equals(fields, that.fields);
   }
 
@@ -250,7 +269,7 @@ public String toString() {
   }
 
   private static final PartitionSpec UNPARTITIONED_SPEC =
-      new PartitionSpec(new Schema(), ImmutableList.of());
+      new PartitionSpec(new Schema(), 0, ImmutableList.of());
 
   /**
    * Returns a spec for unpartitioned tables.
@@ -280,6 +299,7 @@ public static Builder builderFor(Schema schema) {
     private final Schema schema;
     private final List<PartitionField> fields = Lists.newArrayList();
     private final Set<String> partitionNames = Sets.newHashSet();
+    private int specId = 0;
 
     private Builder(Schema schema) {
       this.schema = schema;
@@ -293,6 +313,11 @@ private void checkAndAddPartitionName(String name) {
       partitionNames.add(name);
     }
 
+    public Builder withSpecId(int specId) {
+      this.specId = specId;
+      return this;
+    }
+
     private Types.NestedField findSourceColumn(String sourceName) {
       Types.NestedField sourceColumn = schema.findField(sourceName);
       Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName);
@@ -371,7 +396,7 @@ public Builder add(int sourceId, String name, String transform) {
     }
 
     public PartitionSpec build() {
-      PartitionSpec spec = new PartitionSpec(schema, fields);
+      PartitionSpec spec = new PartitionSpec(schema, specId, fields);
       checkCompatibility(spec, schema);
       return spec;
     }
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
index fa6ffbf..065210e 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
@@ -99,7 +99,12 @@ private ManifestReader(InputFile file) {
       throw new RuntimeIOException(e);
     }
     this.schema = SchemaParser.fromJson(metadata.get("schema"));
-    this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec"));
+    int specId = TableMetadata.INITIAL_SPEC_ID;
+    String specProperty = metadata.get("partition-spec-id");
+    if (specProperty != null) {
+      specId = Integer.parseInt(specProperty);
+    }
+    this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
     this.entries = null;
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index e787b68..28ba831 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -108,7 +108,8 @@ public void close() throws IOException {
               .schema(manifestSchema)
               .named("manifest_entry")
               .meta("schema", SchemaParser.toJson(spec.schema()))
-              .meta("partition-spec", PartitionSpecParser.toJson(spec))
+              .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+              .meta("partition-spec-id", String.valueOf(spec.specId()))
               .build();
         default:
           throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
index fe20f65..4df7c55 100644
--- a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
@@ -37,20 +37,18 @@
   private PartitionSpecParser() {
   }
 
+  private static final String SPEC_ID = "spec-id";
+  private static final String FIELDS = "fields";
   private static final String SOURCE_ID = "source-id";
   private static final String TRANSFORM = "transform";
   private static final String NAME = "name";
 
   public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException {
-    generator.writeStartArray();
-    for (PartitionField field : spec.fields()) {
-      generator.writeStartObject();
-      generator.writeStringField(NAME, field.name());
-      generator.writeStringField(TRANSFORM, field.transform().toString());
-      generator.writeNumberField(SOURCE_ID, field.sourceId());
-      generator.writeEndObject();
-    }
-    generator.writeEndArray();
+    generator.writeStartObject();
+    generator.writeNumberField(SPEC_ID, spec.specId());
+    generator.writeFieldName(FIELDS);
+    toJsonFields(spec, generator);
+    generator.writeEndObject();
   }
 
   public static String toJson(PartitionSpec spec) {
@@ -74,23 +72,10 @@ public static String toJson(PartitionSpec spec, boolean pretty) {
   }
 
   public static PartitionSpec fromJson(Schema schema, JsonNode json) {
-    Preconditions.checkArgument(json.isArray(),
-        "Cannot parse partition spec, not an array: %s", json);
-
-    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
-    Iterator<JsonNode> elements = json.elements();
-    while (elements.hasNext()) {
-      JsonNode element = elements.next();
-      Preconditions.checkArgument(element.isObject(),
-          "Cannot parse partition field, not an object: %s", element);
-
-      String name = JsonUtil.getString(NAME, element);
-      String transform = JsonUtil.getString(TRANSFORM, element);
-      int sourceId = JsonUtil.getInt(SOURCE_ID, element);
-
-      builder.add(sourceId, name, transform);
-    }
-
+    Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json);
+    int specId = JsonUtil.getInt(SPEC_ID, json);
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+    buildFromJsonFields(builder, json.get(FIELDS));
     return builder.build();
   }
 
@@ -113,4 +98,61 @@ public static PartitionSpec fromJson(Schema schema, String json) {
       }
     }
   }
+
+  static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException {
+    generator.writeStartArray();
+    for (PartitionField field : spec.fields()) {
+      generator.writeStartObject();
+      generator.writeStringField(NAME, field.name());
+      generator.writeStringField(TRANSFORM, field.transform().toString());
+      generator.writeNumberField(SOURCE_ID, field.sourceId());
+      generator.writeEndObject();
+    }
+    generator.writeEndArray();
+  }
+
+  static String toJsonFields(PartitionSpec spec) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+      toJsonFields(spec, generator);
+      generator.flush();
+      return writer.toString();
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+    buildFromJsonFields(builder, json);
+    return builder.build();
+  }
+
+  static PartitionSpec fromJsonFields(Schema schema, int specId, String json) {
+    try {
+      return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json);
+    }
+  }
+
+  private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) {
+    Preconditions.checkArgument(json.isArray(),
+        "Cannot parse partition spec fields, not an array: %s", json);
+
+    Iterator<JsonNode> elements = json.elements();
+    while (elements.hasNext()) {
+      JsonNode element = elements.next();
+      Preconditions.checkArgument(element.isObject(),
+          "Cannot parse partition field, not an object: %s", element);
+
+      String name = JsonUtil.getString(NAME, element);
+      String transform = JsonUtil.getString(TRANSFORM, element);
+      int sourceId = JsonUtil.getInt(SOURCE_ID, element);
+
+      builder.add(sourceId, name, transform);
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 38fda2e..05c3392 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.ValidationException;
 import com.netflix.iceberg.io.InputFile;
@@ -40,6 +41,7 @@
  */
 public class TableMetadata {
   static final int TABLE_FORMAT_VERSION = 1;
+  static final int INITIAL_SPEC_ID = 0;
 
   public static TableMetadata newTableMetadata(TableOperations ops,
                                                Schema schema,
@@ -58,7 +60,8 @@ public static TableMetadata newTableMetadata(TableOperations ops,
     Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
 
     // rebuild the partition spec using the new column ids
-    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema)
+        .withSpecId(INITIAL_SPEC_ID);
     for (PartitionField field : spec.fields()) {
       // look up the name of the source field in the old schema to get the new schema's id
       String sourceName = schema.findColumnName(field.sourceId());
@@ -71,7 +74,7 @@ public static TableMetadata newTableMetadata(TableOperations ops,
 
     return new TableMetadata(ops, null, location,
         System.currentTimeMillis(),
-        lastColumnId.get(), freshSchema, freshSpec,
+        lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
   }
 
@@ -126,11 +129,13 @@ public String toString() {
   private final long lastUpdatedMillis;
   private final int lastColumnId;
   private final Schema schema;
-  private final PartitionSpec spec;
+  private final int defaultSpecId;
+  private final List<PartitionSpec> specs;
   private final Map<String, String> properties;
   private final long currentSnapshotId;
   private final List<Snapshot> snapshots;
   private final Map<Long, Snapshot> snapshotsById;
+  private final Map<Integer, PartitionSpec> specsById;
   private final List<SnapshotLogEntry> snapshotLog;
 
   TableMetadata(TableOperations ops,
@@ -139,7 +144,8 @@ public String toString() {
                 long lastUpdatedMillis,
                 int lastColumnId,
                 Schema schema,
-                PartitionSpec spec,
+                int defaultSpecId,
+                List<PartitionSpec> specs,
                 Map<String, String> properties,
                 long currentSnapshotId,
                 List<Snapshot> snapshots,
@@ -150,17 +156,15 @@ public String toString() {
     this.lastUpdatedMillis = lastUpdatedMillis;
     this.lastColumnId = lastColumnId;
     this.schema = schema;
-    this.spec = spec;
+    this.specs = specs;
+    this.defaultSpecId = defaultSpecId;
     this.properties = properties;
     this.currentSnapshotId = currentSnapshotId;
     this.snapshots = snapshots;
     this.snapshotLog = snapshotLog;
 
-    ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
-    for (Snapshot version : snapshots) {
-      builder.put(version.snapshotId(), version);
-    }
-    this.snapshotsById = builder.build();
+    this.snapshotsById = indexSnapshots(snapshots);
+    this.specsById = indexSpecs(specs);
 
     SnapshotLogEntry last = null;
     for (SnapshotLogEntry logEntry : snapshotLog) {
@@ -194,7 +198,19 @@ public Schema schema() {
   }
 
   public PartitionSpec spec() {
-    return spec;
+    return specsById.get(defaultSpecId);
+  }
+
+  public int defaultSpecId() {
+    return defaultSpecId;
+  }
+
+  public PartitionSpec spec(int id) {
+    return specsById.get(id);
+  }
+
+  public List<PartitionSpec> specs() {
+    return specs;
   }
 
   public String location() {
@@ -239,15 +255,45 @@ public Snapshot currentSnapshot() {
 
   public TableMetadata updateTableLocation(String newLocation) {
     return new TableMetadata(ops, null, newLocation,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots, snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata updateSchema(Schema schema, int lastColumnId) {
-    PartitionSpec.checkCompatibility(spec, schema);
+    PartitionSpec.checkCompatibility(spec(), schema);
+    return new TableMetadata(ops, null, location,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
+  }
+
+  public TableMetadata updatePartitionSpec(PartitionSpec partitionSpec) {
+    PartitionSpec.checkCompatibility(partitionSpec, schema);
+
+    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+    int newDefaultSpecId = INITIAL_SPEC_ID;
+    for (PartitionSpec spec : specs) {
+      if (partitionSpec.compatibleWith(spec)) {
+        newDefaultSpecId = spec.specId();
+        break;
+      } else if (newDefaultSpecId <= spec.specId()) {
+        newDefaultSpecId = spec.specId() + 1;
+      }
+    }
+
+    Preconditions.checkArgument(defaultSpecId != newDefaultSpecId,
+        "Cannot set default partition spec to the current default");
+
+    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+        .addAll(specs);
+    if (!specsById.containsKey(newDefaultSpecId)) {
+      // get a fresh spec to ensure the spec ID is set to the new default
+      builder.add(freshSpec(newDefaultSpecId, schema, partitionSpec));
+    }
+
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots,snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
+        builder.build(), properties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
@@ -260,8 +306,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
         .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
         .build();
     return new TableMetadata(ops, null, location,
-        snapshot.timestampMillis(), lastColumnId, schema, spec, properties, snapshot.snapshotId(),
-        newSnapshots, newSnapshotLog);
+        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.snapshotId(), newSnapshots, newSnapshotLog);
   }
 
   public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
@@ -291,8 +337,8 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
     }
 
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        filtered, ImmutableList.copyOf(newSnapshotLog));
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog));
   }
 
   public TableMetadata rollbackTo(Snapshot snapshot) {
@@ -306,15 +352,15 @@ public TableMetadata rollbackTo(Snapshot snapshot) {
         .build();
 
     return new TableMetadata(ops, null, location,
-        nowMillis, lastColumnId, schema, spec, properties, snapshot.snapshotId(), snapshots,
-        newSnapshotLog);
+        nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+        snapshot.snapshotId(), snapshots, newSnapshotLog);
   }
 
   public TableMetadata replaceProperties(Map<String, String> newProperties) {
     ValidationException.check(newProperties != null, "Cannot set properties to null");
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, newProperties, currentSnapshotId,
-        snapshots, snapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
+        currentSnapshotId, snapshots, snapshotLog);
   }
 
   public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
@@ -330,8 +376,8 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
             Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
         "Cannot set invalid snapshot log: latest entry is not the current snapshot");
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
-        snapshots, newSnapshotLog);
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, newSnapshotLog);
   }
 
   public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec,
@@ -339,24 +385,70 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec
     AtomicInteger lastColumnId = new AtomicInteger(0);
     Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
 
+    int nextSpecId = TableMetadata.INITIAL_SPEC_ID;
+    for (Integer specId : specsById.keySet()) {
+      if (nextSpecId <= specId) {
+        nextSpecId = specId + 1;
+      }
+    }
+
     // rebuild the partition spec using the new column ids
-    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+    PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, partitionSpec);
+
+    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+    int specId = nextSpecId;
+    for (PartitionSpec spec : specs) {
+      if (freshSpec.compatibleWith(spec)) {
+        specId = spec.specId();
+        break;
+      }
+    }
+
+    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+        .addAll(specs);
+    if (!specsById.containsKey(specId)) {
+      builder.add(freshSpec);
+    }
+
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.putAll(this.properties);
+    newProperties.putAll(properties);
+
+    return new TableMetadata(ops, null, location,
+        System.currentTimeMillis(), lastColumnId.get(), freshSchema,
+        specId, builder.build(), ImmutableMap.copyOf(newProperties),
+        -1, snapshots, ImmutableList.of());
+  }
+
+  private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
+        .withSpecId(specId);
+
     for (PartitionField field : partitionSpec.fields()) {
       // look up the name of the source field in the old schema to get the new schema's id
-      String sourceName = schema.findColumnName(field.sourceId());
+      String sourceName = partitionSpec.schema().findColumnName(field.sourceId());
       specBuilder.add(
-          freshSchema.findField(sourceName).fieldId(),
+          schema.findField(sourceName).fieldId(),
           field.name(),
           field.transform().toString());
     }
-    PartitionSpec freshSpec = specBuilder.build();
 
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    builder.putAll(this.properties);
-    builder.putAll(properties);
+    return specBuilder.build();
+  }
 
-    return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId.get(), freshSchema, freshSpec, properties, -1,
-        snapshots, ImmutableList.of());
+  private static Map<Long, Snapshot> indexSnapshots(List<Snapshot> snapshots) {
+    ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
+    for (Snapshot version : snapshots) {
+      builder.put(version.snapshotId(), version);
+    }
+    return builder.build();
+  }
+
+  private static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
+    ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
+    for (PartitionSpec spec : specs) {
+      builder.put(spec.specId(), spec);
+    }
+    return builder.build();
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
index f0508b3..0961d8c 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
@@ -46,18 +46,21 @@
 
 public class TableMetadataParser {
 
-  private static final String FORMAT_VERSION = "format-version";
-  private static final String LOCATION = "location";
-  private static final String LAST_UPDATED_MILLIS = "last-updated-ms";
-  private static final String LAST_COLUMN_ID = "last-column-id";
-  private static final String SCHEMA = "schema";
-  private static final String PARTITION_SPEC = "partition-spec";
-  private static final String PROPERTIES = "properties";
-  private static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
-  private static final String SNAPSHOTS = "snapshots";
-  private static final String SNAPSHOT_ID = "snapshot-id";
-  private static final String TIMESTAMP_MS = "timestamp-ms";
-  private static final String SNAPSHOT_LOG = "snapshot-log";
+  // visible for testing
+  static final String FORMAT_VERSION = "format-version";
+  static final String LOCATION = "location";
+  static final String LAST_UPDATED_MILLIS = "last-updated-ms";
+  static final String LAST_COLUMN_ID = "last-column-id";
+  static final String SCHEMA = "schema";
+  static final String PARTITION_SPEC = "partition-spec";
+  static final String PARTITION_SPECS = "partition-specs";
+  static final String DEFAULT_SPEC_ID = "default-spec-id";
+  static final String PROPERTIES = "properties";
+  static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
+  static final String SNAPSHOTS = "snapshots";
+  static final String SNAPSHOT_ID = "snapshot-id";
+  static final String TIMESTAMP_MS = "timestamp-ms";
+  static final String SNAPSHOT_LOG = "snapshot-log";
 
   public static String toJson(TableMetadata metadata) {
     StringWriter writer = new StringWriter();
@@ -100,8 +103,17 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro
     generator.writeFieldName(SCHEMA);
     SchemaParser.toJson(metadata.schema(), generator);
 
+    // for older readers, continue writing the default spec as "partition-spec"
     generator.writeFieldName(PARTITION_SPEC);
-    PartitionSpecParser.toJson(metadata.spec(), generator);
+    PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+    // write the default spec ID and spec list
+    generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
+    generator.writeArrayFieldStart(PARTITION_SPECS);
+    for (PartitionSpec spec : metadata.specs()) {
+      PartitionSpecParser.toJson(spec, generator);
+    }
+    generator.writeEndArray();
 
     generator.writeObjectFieldStart(PROPERTIES);
     for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
@@ -150,7 +162,32 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node
     String location = JsonUtil.getString(LOCATION, node);
     int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
     Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
-    PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC));
+
+    JsonNode specArray = node.get(PARTITION_SPECS);
+    List<PartitionSpec> specs;
+    int defaultSpecId;
+    if (specArray != null) {
+      Preconditions.checkArgument(specArray.isArray(),
+          "Cannot parse partition specs from non-array: %s", specArray);
+      // default spec ID is required when the spec array is present
+      defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node);
+
+      // parse the spec array
+      ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
+      for (JsonNode spec : specArray) {
+        builder.add(PartitionSpecParser.fromJson(schema, spec));
+      }
+      specs = builder.build();
+
+    } else {
+      // partition spec is required for older readers, but is always set to the default if the spec
+      // array is set. it is only used to default the spec map is missing, indicating that the
+      // table metadata was written by an older writer.
+      defaultSpecId = TableMetadata.INITIAL_SPEC_ID;
+      specs = ImmutableList.of(PartitionSpecParser.fromJsonFields(
+          schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
+    }
+
     Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
     long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
     long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);
@@ -177,8 +214,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node
     }
 
     return new TableMetadata(ops, file, location,
-        lastUpdatedMillis, lastAssignedColumnId, schema, spec, properties, currentVersionId,
-        snapshots, ImmutableList.copyOf(entries.iterator()));
+        lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
+        currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()));
   }
-
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index dd7a3cf..a1e28bc 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.ManifestEntry.Status;
@@ -236,16 +235,14 @@ public void testChangedPartitionSpec() {
         1, base.currentSnapshot().manifests().size());
     String initialManifest = base.currentSnapshot().manifests().get(0);
 
-    PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+    // build the new spec using the table's schema, which uses fresh IDs
+    PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
         .bucket("data", 16)
         .bucket("id", 4)
         .build();
 
     // commit the new partition spec to the table manually
-    TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
-        System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
-        base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
-    table.ops().commit(base, updated);
+    table.ops().commit(base, base.updatePartitionSpec(newSpec));
 
     DataFile newFileC = DataFiles.builder(newSpec)
         .copy(FILE_C)
@@ -284,16 +281,14 @@ public void testChangedPartitionSpecMergeExisting() {
         2, base.currentSnapshot().manifests().size());
     String manifest = base.currentSnapshot().manifests().get(0);
 
-    PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+    // build the new spec using the table's schema, which uses fresh IDs
+    PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
         .bucket("data", 16)
         .bucket("id", 4)
         .build();
 
     // commit the new partition spec to the table manually
-    TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
-        System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
-        base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
-    table.ops().commit(base, updated);
+    table.ops().commit(base, base.updatePartitionSpec(newSpec));
 
     DataFile newFileC = DataFiles.builder(newSpec)
         .copy(FILE_C)
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 53df5db..21acdbd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -19,19 +19,34 @@
 
 package com.netflix.iceberg;
 
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.TableMetadata.SnapshotLogEntry;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.types.Types;
 import com.netflix.iceberg.util.JsonUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
+import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
+import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
+import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
+import static com.netflix.iceberg.TableMetadataParser.LAST_UPDATED_MILLIS;
+import static com.netflix.iceberg.TableMetadataParser.LOCATION;
+import static com.netflix.iceberg.TableMetadataParser.PARTITION_SPEC;
+import static com.netflix.iceberg.TableMetadataParser.PROPERTIES;
+import static com.netflix.iceberg.TableMetadataParser.SCHEMA;
+import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
+
 public class TestTableMetadataJson {
   @Test
   public void testJsonConversion() throws Exception {
@@ -41,7 +56,7 @@ public void testJsonConversion() throws Exception {
         Types.NestedField.required(3, "z", Types.LongType.get())
     );
 
-    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
@@ -56,8 +71,9 @@ public void testJsonConversion() throws Exception {
         .build();
 
     TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
-        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
+        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
 
     String asJson = TableMetadataParser.toJson(expected);
     TableMetadata metadata = TableMetadataParser.fromJson(null, null,
@@ -71,6 +87,10 @@ public void testJsonConversion() throws Exception {
         expected.schema().asStruct(), metadata.schema().asStruct());
     Assert.assertEquals("Partition spec should match",
         expected.spec().toString(), metadata.spec().toString());
+    Assert.assertEquals("Default spec ID should match",
+        expected.defaultSpecId(), metadata.defaultSpecId());
+    Assert.assertEquals("PartitionSpec map should match",
+        expected.specs(), metadata.specs());
     Assert.assertEquals("Properties should match",
         expected.properties(), metadata.properties());
     Assert.assertEquals("Snapshot logs should match",
@@ -96,7 +116,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
         Types.NestedField.required(3, "z", Types.LongType.get())
     );
 
-    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
@@ -108,8 +128,9 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
     List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
 
     TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
-        System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
-        currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
+        System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
 
     // add the entries after creating TableMetadata to avoid the sorted check
     reversedSnapshotLog.add(
@@ -129,4 +150,108 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
     Assert.assertEquals("Snapshot logs should match",
         expectedSnapshotLog, metadata.snapshotLog());
   }
+
+  @Test
+  public void testBackwardCompatMissingPartitionSpecList() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(1, "x", Types.LongType.get()),
+        Types.NestedField.required(2, "y", Types.LongType.get()),
+        Types.NestedField.required(3, "z", Types.LongType.get())
+    );
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build();
+
+    long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
+    Snapshot previousSnapshot = new BaseSnapshot(
+        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+    long currentSnapshotId = System.currentTimeMillis();
+    Snapshot currentSnapshot = new BaseSnapshot(
+        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+
+    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+        System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
+        ImmutableMap.of("property", "value"), currentSnapshotId,
+        Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
+
+    String asJson = toJsonWithoutSpecList(expected);
+    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+        JsonUtil.mapper().readValue(asJson, JsonNode.class));
+
+    Assert.assertEquals("Table location should match",
+        expected.location(), metadata.location());
+    Assert.assertEquals("Last column ID should match",
+        expected.lastColumnId(), metadata.lastColumnId());
+    Assert.assertEquals("Schema should match",
+        expected.schema().asStruct(), metadata.schema().asStruct());
+    Assert.assertEquals("Partition spec should be the default",
+        expected.spec().toString(), metadata.spec().toString());
+    Assert.assertEquals("Default spec ID should default to TableMetadata.INITIAL_SPEC_ID",
+        TableMetadata.INITIAL_SPEC_ID, metadata.defaultSpecId());
+    Assert.assertEquals("PartitionSpec should contain the spec",
+        1, metadata.specs().size());
+    Assert.assertTrue("PartitionSpec should contain the spec",
+        metadata.specs().get(0).compatibleWith(spec));
+    Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID",
+        TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId());
+    Assert.assertEquals("Properties should match",
+        expected.properties(), metadata.properties());
+    Assert.assertEquals("Snapshot logs should match",
+        expected.snapshotLog(), metadata.snapshotLog());
+    Assert.assertEquals("Current snapshot ID should match",
+        currentSnapshotId, metadata.currentSnapshot().snapshotId());
+    Assert.assertEquals("Parent snapshot ID should match",
+        (Long) previousSnapshotId, metadata.currentSnapshot().parentId());
+    Assert.assertEquals("Current snapshot files should match",
+        currentSnapshot.manifests(), metadata.currentSnapshot().manifests());
+    Assert.assertEquals("Previous snapshot ID should match",
+        previousSnapshotId, metadata.snapshot(previousSnapshotId).snapshotId());
+    Assert.assertEquals("Previous snapshot files should match",
+        previousSnapshot.manifests(),
+        metadata.snapshot(previousSnapshotId).manifests());
+  }
+
+  public static String toJsonWithoutSpecList(TableMetadata metadata) {
+    StringWriter writer = new StringWriter();
+    try {
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+      generator.writeStartObject(); // start table metadata object
+
+      generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+      generator.writeStringField(LOCATION, metadata.location());
+      generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+      generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
+
+      generator.writeFieldName(SCHEMA);
+      SchemaParser.toJson(metadata.schema(), generator);
+
+      // mimic an old writer by writing only partition-spec and not the default ID or spec list
+      generator.writeFieldName(PARTITION_SPEC);
+      PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+      generator.writeObjectFieldStart(PROPERTIES);
+      for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
+        generator.writeStringField(keyValue.getKey(), keyValue.getValue());
+      }
+      generator.writeEndObject();
+
+      generator.writeNumberField(CURRENT_SNAPSHOT_ID,
+          metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
+
+      generator.writeArrayFieldStart(SNAPSHOTS);
+      for (Snapshot snapshot : metadata.snapshots()) {
+        SnapshotParser.toJson(snapshot, generator);
+      }
+      generator.writeEndArray();
+
+      // skip the snapshot log
+
+      generator.writeEndObject(); // end table metadata object
+
+      generator.flush();
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
+    }
+    return writer.toString();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services