You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/05 19:53:10 UTC
[incubator-iceberg] branch master updated: Store multiple partition
specs in table metadata. (#3)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fd8a162 Store multiple partition specs in table metadata. (#3)
fd8a162 is described below
commit fd8a162e4d39aaeaca8da61a96ed62d4f391dfb9
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Dec 5 11:53:06 2018 -0800
Store multiple partition specs in table metadata. (#3)
The purpose of this change is to enable future partition spec changes
and to assign IDs to specs that can be easily encoded in an Avro file
that tracks a snapshot's manifests.
This updates TableMetadata and the metadata parser to support multiple
partition specs. This change is forward-compatible for older readers
because the "partition-spec" field in table metadata is still set to the
default spec.
Multiple specs are now stored in an array in table metadata called
"partition-specs". Each entry in the array is an object with two fields,
a "spec-id" field with an integer ID value, and a "partition-spec"
field with a partition spec value (an array of partition fields). This
also adds "default-spec-id" that points to the spec that should be used
when writing.
---
.../java/com/netflix/iceberg/PartitionSpec.java | 31 +++-
.../java/com/netflix/iceberg/ManifestReader.java | 7 +-
.../java/com/netflix/iceberg/ManifestWriter.java | 3 +-
.../com/netflix/iceberg/PartitionSpecParser.java | 94 ++++++++----
.../java/com/netflix/iceberg/TableMetadata.java | 164 ++++++++++++++++-----
.../com/netflix/iceberg/TableMetadataParser.java | 70 ++++++---
.../java/com/netflix/iceberg/TestMergeAppend.java | 17 +--
.../com/netflix/iceberg/TestTableMetadataJson.java | 137 ++++++++++++++++-
8 files changed, 422 insertions(+), 101 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
index da13f8c..b17e25b 100644
--- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
+++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java
@@ -50,14 +50,16 @@ public class PartitionSpec implements Serializable {
private final Schema schema;
// this is ordered so that DataFile has a consistent schema
+ private final int specId;
private final PartitionField[] fields;
private transient Map<Integer, PartitionField> fieldsBySourceId = null;
private transient Map<String, PartitionField> fieldsByName = null;
private transient Class<?>[] javaClasses = null;
private transient List<PartitionField> fieldList = null;
- private PartitionSpec(Schema schema, List<PartitionField> fields) {
+ private PartitionSpec(Schema schema, int specId, List<PartitionField> fields) {
this.schema = schema;
+ this.specId = specId;
this.fields = new PartitionField[fields.size()];
for (int i = 0; i < this.fields.length; i += 1) {
this.fields[i] = fields.get(i);
@@ -72,6 +74,13 @@ public class PartitionSpec implements Serializable {
}
/**
+ * @return the ID of this spec
+ */
+ public int specId() {
+ return specId;
+ }
+
+ /**
* @return the list of {@link PartitionField partition fields} for this spec.
*/
public List<PartitionField> fields() {
@@ -146,6 +155,13 @@ public class PartitionSpec implements Serializable {
return sb.toString();
}
+ /**
+ * Returns true if this spec is equivalent to the other, with field names ignored. That is, if
+ * both specs have the same number of fields, field order, source columns, and transforms.
+ *
+ * @param other another PartitionSpec
+ * @return true if the specs have the same fields, source columns, and transforms.
+ */
public boolean compatibleWith(PartitionSpec other) {
if (equals(other)) {
return true;
@@ -177,6 +193,9 @@ public class PartitionSpec implements Serializable {
}
PartitionSpec that = (PartitionSpec) other;
+ if (this.specId != that.specId) {
+ return false;
+ }
return Arrays.equals(fields, that.fields);
}
@@ -250,7 +269,7 @@ public class PartitionSpec implements Serializable {
}
private static final PartitionSpec UNPARTITIONED_SPEC =
- new PartitionSpec(new Schema(), ImmutableList.of());
+ new PartitionSpec(new Schema(), 0, ImmutableList.of());
/**
* Returns a spec for unpartitioned tables.
@@ -280,6 +299,7 @@ public class PartitionSpec implements Serializable {
private final Schema schema;
private final List<PartitionField> fields = Lists.newArrayList();
private final Set<String> partitionNames = Sets.newHashSet();
+ private int specId = 0;
private Builder(Schema schema) {
this.schema = schema;
@@ -293,6 +313,11 @@ public class PartitionSpec implements Serializable {
partitionNames.add(name);
}
+ public Builder withSpecId(int specId) {
+ this.specId = specId;
+ return this;
+ }
+
private Types.NestedField findSourceColumn(String sourceName) {
Types.NestedField sourceColumn = schema.findField(sourceName);
Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName);
@@ -371,7 +396,7 @@ public class PartitionSpec implements Serializable {
}
public PartitionSpec build() {
- PartitionSpec spec = new PartitionSpec(schema, fields);
+ PartitionSpec spec = new PartitionSpec(schema, specId, fields);
checkCompatibility(spec, schema);
return spec;
}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
index fa6ffbf..065210e 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java
@@ -99,7 +99,12 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
throw new RuntimeIOException(e);
}
this.schema = SchemaParser.fromJson(metadata.get("schema"));
- this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec"));
+ int specId = TableMetadata.INITIAL_SPEC_ID;
+ String specProperty = metadata.get("partition-spec-id");
+ if (specProperty != null) {
+ specId = Integer.parseInt(specProperty);
+ }
+ this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
this.entries = null;
}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index e787b68..28ba831 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -108,7 +108,8 @@ class ManifestWriter implements FileAppender<DataFile> {
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
- .meta("partition-spec", PartitionSpecParser.toJson(spec))
+ .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+ .meta("partition-spec-id", String.valueOf(spec.specId()))
.build();
default:
throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
index fe20f65..4df7c55 100644
--- a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
@@ -37,20 +37,18 @@ public class PartitionSpecParser {
private PartitionSpecParser() {
}
+ private static final String SPEC_ID = "spec-id";
+ private static final String FIELDS = "fields";
private static final String SOURCE_ID = "source-id";
private static final String TRANSFORM = "transform";
private static final String NAME = "name";
public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException {
- generator.writeStartArray();
- for (PartitionField field : spec.fields()) {
- generator.writeStartObject();
- generator.writeStringField(NAME, field.name());
- generator.writeStringField(TRANSFORM, field.transform().toString());
- generator.writeNumberField(SOURCE_ID, field.sourceId());
- generator.writeEndObject();
- }
- generator.writeEndArray();
+ generator.writeStartObject();
+ generator.writeNumberField(SPEC_ID, spec.specId());
+ generator.writeFieldName(FIELDS);
+ toJsonFields(spec, generator);
+ generator.writeEndObject();
}
public static String toJson(PartitionSpec spec) {
@@ -74,23 +72,10 @@ public class PartitionSpecParser {
}
public static PartitionSpec fromJson(Schema schema, JsonNode json) {
- Preconditions.checkArgument(json.isArray(),
- "Cannot parse partition spec, not an array: %s", json);
-
- PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
- Iterator<JsonNode> elements = json.elements();
- while (elements.hasNext()) {
- JsonNode element = elements.next();
- Preconditions.checkArgument(element.isObject(),
- "Cannot parse partition field, not an object: %s", element);
-
- String name = JsonUtil.getString(NAME, element);
- String transform = JsonUtil.getString(TRANSFORM, element);
- int sourceId = JsonUtil.getInt(SOURCE_ID, element);
-
- builder.add(sourceId, name, transform);
- }
-
+ Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json);
+ int specId = JsonUtil.getInt(SPEC_ID, json);
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+ buildFromJsonFields(builder, json.get(FIELDS));
return builder.build();
}
@@ -113,4 +98,61 @@ public class PartitionSpecParser {
}
}
}
+
+ static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException {
+ generator.writeStartArray();
+ for (PartitionField field : spec.fields()) {
+ generator.writeStartObject();
+ generator.writeStringField(NAME, field.name());
+ generator.writeStringField(TRANSFORM, field.transform().toString());
+ generator.writeNumberField(SOURCE_ID, field.sourceId());
+ generator.writeEndObject();
+ }
+ generator.writeEndArray();
+ }
+
+ static String toJsonFields(PartitionSpec spec) {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ toJsonFields(spec, generator);
+ generator.flush();
+ return writer.toString();
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
+ buildFromJsonFields(builder, json);
+ return builder.build();
+ }
+
+ static PartitionSpec fromJsonFields(Schema schema, int specId, String json) {
+ try {
+ return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json);
+ }
+ }
+
+ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) {
+ Preconditions.checkArgument(json.isArray(),
+ "Cannot parse partition spec fields, not an array: %s", json);
+
+ Iterator<JsonNode> elements = json.elements();
+ while (elements.hasNext()) {
+ JsonNode element = elements.next();
+ Preconditions.checkArgument(element.isObject(),
+ "Cannot parse partition field, not an object: %s", element);
+
+ String name = JsonUtil.getString(NAME, element);
+ String transform = JsonUtil.getString(TRANSFORM, element);
+ int sourceId = JsonUtil.getInt(SOURCE_ID, element);
+
+ builder.add(sourceId, name, transform);
+ }
+ }
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 38fda2e..05c3392 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.ValidationException;
import com.netflix.iceberg.io.InputFile;
@@ -40,6 +41,7 @@ import java.util.function.Predicate;
*/
public class TableMetadata {
static final int TABLE_FORMAT_VERSION = 1;
+ static final int INITIAL_SPEC_ID = 0;
public static TableMetadata newTableMetadata(TableOperations ops,
Schema schema,
@@ -58,7 +60,8 @@ public class TableMetadata {
Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
// rebuild the partition spec using the new column ids
- PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+ PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema)
+ .withSpecId(INITIAL_SPEC_ID);
for (PartitionField field : spec.fields()) {
// look up the name of the source field in the old schema to get the new schema's id
String sourceName = schema.findColumnName(field.sourceId());
@@ -71,7 +74,7 @@ public class TableMetadata {
return new TableMetadata(ops, null, location,
System.currentTimeMillis(),
- lastColumnId.get(), freshSchema, freshSpec,
+ lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
}
@@ -126,11 +129,13 @@ public class TableMetadata {
private final long lastUpdatedMillis;
private final int lastColumnId;
private final Schema schema;
- private final PartitionSpec spec;
+ private final int defaultSpecId;
+ private final List<PartitionSpec> specs;
private final Map<String, String> properties;
private final long currentSnapshotId;
private final List<Snapshot> snapshots;
private final Map<Long, Snapshot> snapshotsById;
+ private final Map<Integer, PartitionSpec> specsById;
private final List<SnapshotLogEntry> snapshotLog;
TableMetadata(TableOperations ops,
@@ -139,7 +144,8 @@ public class TableMetadata {
long lastUpdatedMillis,
int lastColumnId,
Schema schema,
- PartitionSpec spec,
+ int defaultSpecId,
+ List<PartitionSpec> specs,
Map<String, String> properties,
long currentSnapshotId,
List<Snapshot> snapshots,
@@ -150,17 +156,15 @@ public class TableMetadata {
this.lastUpdatedMillis = lastUpdatedMillis;
this.lastColumnId = lastColumnId;
this.schema = schema;
- this.spec = spec;
+ this.specs = specs;
+ this.defaultSpecId = defaultSpecId;
this.properties = properties;
this.currentSnapshotId = currentSnapshotId;
this.snapshots = snapshots;
this.snapshotLog = snapshotLog;
- ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
- for (Snapshot version : snapshots) {
- builder.put(version.snapshotId(), version);
- }
- this.snapshotsById = builder.build();
+ this.snapshotsById = indexSnapshots(snapshots);
+ this.specsById = indexSpecs(specs);
SnapshotLogEntry last = null;
for (SnapshotLogEntry logEntry : snapshotLog) {
@@ -194,7 +198,19 @@ public class TableMetadata {
}
public PartitionSpec spec() {
- return spec;
+ return specsById.get(defaultSpecId);
+ }
+
+ public int defaultSpecId() {
+ return defaultSpecId;
+ }
+
+ public PartitionSpec spec(int id) {
+ return specsById.get(id);
+ }
+
+ public List<PartitionSpec> specs() {
+ return specs;
}
public String location() {
@@ -239,15 +255,45 @@ public class TableMetadata {
public TableMetadata updateTableLocation(String newLocation) {
return new TableMetadata(ops, null, newLocation,
- System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
- snapshots, snapshotLog);
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, snapshots, snapshotLog);
}
public TableMetadata updateSchema(Schema schema, int lastColumnId) {
- PartitionSpec.checkCompatibility(spec, schema);
+ PartitionSpec.checkCompatibility(spec(), schema);
+ return new TableMetadata(ops, null, location,
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, snapshots, snapshotLog);
+ }
+
+ public TableMetadata updatePartitionSpec(PartitionSpec partitionSpec) {
+ PartitionSpec.checkCompatibility(partitionSpec, schema);
+
+ // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+ int newDefaultSpecId = INITIAL_SPEC_ID;
+ for (PartitionSpec spec : specs) {
+ if (partitionSpec.compatibleWith(spec)) {
+ newDefaultSpecId = spec.specId();
+ break;
+ } else if (newDefaultSpecId <= spec.specId()) {
+ newDefaultSpecId = spec.specId() + 1;
+ }
+ }
+
+ Preconditions.checkArgument(defaultSpecId != newDefaultSpecId,
+ "Cannot set default partition spec to the current default");
+
+ ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+ .addAll(specs);
+ if (!specsById.containsKey(newDefaultSpecId)) {
+ // get a fresh spec to ensure the spec ID is set to the new default
+ builder.add(freshSpec(newDefaultSpecId, schema, partitionSpec));
+ }
+
return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
- snapshots,snapshotLog);
+ System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
+ builder.build(), properties,
+ currentSnapshotId, snapshots, snapshotLog);
}
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
@@ -260,8 +306,8 @@ public class TableMetadata {
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
return new TableMetadata(ops, null, location,
- snapshot.timestampMillis(), lastColumnId, schema, spec, properties, snapshot.snapshotId(),
- newSnapshots, newSnapshotLog);
+ snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ snapshot.snapshotId(), newSnapshots, newSnapshotLog);
}
public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
@@ -291,8 +337,8 @@ public class TableMetadata {
}
return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
- filtered, ImmutableList.copyOf(newSnapshotLog));
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog));
}
public TableMetadata rollbackTo(Snapshot snapshot) {
@@ -306,15 +352,15 @@ public class TableMetadata {
.build();
return new TableMetadata(ops, null, location,
- nowMillis, lastColumnId, schema, spec, properties, snapshot.snapshotId(), snapshots,
- newSnapshotLog);
+ nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+ snapshot.snapshotId(), snapshots, newSnapshotLog);
}
public TableMetadata replaceProperties(Map<String, String> newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId, schema, spec, newProperties, currentSnapshotId,
- snapshots, snapshotLog);
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
+ currentSnapshotId, snapshots, snapshotLog);
}
public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
@@ -330,8 +376,8 @@ public class TableMetadata {
Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
"Cannot set invalid snapshot log: latest entry is not the current snapshot");
return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId,
- snapshots, newSnapshotLog);
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, snapshots, newSnapshotLog);
}
public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec,
@@ -339,24 +385,70 @@ public class TableMetadata {
AtomicInteger lastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
+ int nextSpecId = TableMetadata.INITIAL_SPEC_ID;
+ for (Integer specId : specsById.keySet()) {
+ if (nextSpecId <= specId) {
+ nextSpecId = specId + 1;
+ }
+ }
+
// rebuild the partition spec using the new column ids
- PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema);
+ PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, partitionSpec);
+
+ // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+ int specId = nextSpecId;
+ for (PartitionSpec spec : specs) {
+ if (freshSpec.compatibleWith(spec)) {
+ specId = spec.specId();
+ break;
+ }
+ }
+
+ ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
+ .addAll(specs);
+ if (!specsById.containsKey(specId)) {
+ builder.add(freshSpec);
+ }
+
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.putAll(this.properties);
+ newProperties.putAll(properties);
+
+ return new TableMetadata(ops, null, location,
+ System.currentTimeMillis(), lastColumnId.get(), freshSchema,
+ specId, builder.build(), ImmutableMap.copyOf(newProperties),
+ -1, snapshots, ImmutableList.of());
+ }
+
+ private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
+ PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
+ .withSpecId(specId);
+
for (PartitionField field : partitionSpec.fields()) {
// look up the name of the source field in the old schema to get the new schema's id
- String sourceName = schema.findColumnName(field.sourceId());
+ String sourceName = partitionSpec.schema().findColumnName(field.sourceId());
specBuilder.add(
- freshSchema.findField(sourceName).fieldId(),
+ schema.findField(sourceName).fieldId(),
field.name(),
field.transform().toString());
}
- PartitionSpec freshSpec = specBuilder.build();
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- builder.putAll(this.properties);
- builder.putAll(properties);
+ return specBuilder.build();
+ }
- return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId.get(), freshSchema, freshSpec, properties, -1,
- snapshots, ImmutableList.of());
+ private static Map<Long, Snapshot> indexSnapshots(List<Snapshot> snapshots) {
+ ImmutableMap.Builder<Long, Snapshot> builder = ImmutableMap.builder();
+ for (Snapshot version : snapshots) {
+ builder.put(version.snapshotId(), version);
+ }
+ return builder.build();
+ }
+
+ private static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
+ ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
+ for (PartitionSpec spec : specs) {
+ builder.put(spec.specId(), spec);
+ }
+ return builder.build();
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
index f0508b3..0961d8c 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java
@@ -46,18 +46,21 @@ import java.util.SortedSet;
public class TableMetadataParser {
- private static final String FORMAT_VERSION = "format-version";
- private static final String LOCATION = "location";
- private static final String LAST_UPDATED_MILLIS = "last-updated-ms";
- private static final String LAST_COLUMN_ID = "last-column-id";
- private static final String SCHEMA = "schema";
- private static final String PARTITION_SPEC = "partition-spec";
- private static final String PROPERTIES = "properties";
- private static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
- private static final String SNAPSHOTS = "snapshots";
- private static final String SNAPSHOT_ID = "snapshot-id";
- private static final String TIMESTAMP_MS = "timestamp-ms";
- private static final String SNAPSHOT_LOG = "snapshot-log";
+ // visible for testing
+ static final String FORMAT_VERSION = "format-version";
+ static final String LOCATION = "location";
+ static final String LAST_UPDATED_MILLIS = "last-updated-ms";
+ static final String LAST_COLUMN_ID = "last-column-id";
+ static final String SCHEMA = "schema";
+ static final String PARTITION_SPEC = "partition-spec";
+ static final String PARTITION_SPECS = "partition-specs";
+ static final String DEFAULT_SPEC_ID = "default-spec-id";
+ static final String PROPERTIES = "properties";
+ static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
+ static final String SNAPSHOTS = "snapshots";
+ static final String SNAPSHOT_ID = "snapshot-id";
+ static final String TIMESTAMP_MS = "timestamp-ms";
+ static final String SNAPSHOT_LOG = "snapshot-log";
public static String toJson(TableMetadata metadata) {
StringWriter writer = new StringWriter();
@@ -100,8 +103,17 @@ public class TableMetadataParser {
generator.writeFieldName(SCHEMA);
SchemaParser.toJson(metadata.schema(), generator);
+ // for older readers, continue writing the default spec as "partition-spec"
generator.writeFieldName(PARTITION_SPEC);
- PartitionSpecParser.toJson(metadata.spec(), generator);
+ PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+ // write the default spec ID and spec list
+ generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
+ generator.writeArrayFieldStart(PARTITION_SPECS);
+ for (PartitionSpec spec : metadata.specs()) {
+ PartitionSpecParser.toJson(spec, generator);
+ }
+ generator.writeEndArray();
generator.writeObjectFieldStart(PROPERTIES);
for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
@@ -150,7 +162,32 @@ public class TableMetadataParser {
String location = JsonUtil.getString(LOCATION, node);
int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
- PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC));
+
+ JsonNode specArray = node.get(PARTITION_SPECS);
+ List<PartitionSpec> specs;
+ int defaultSpecId;
+ if (specArray != null) {
+ Preconditions.checkArgument(specArray.isArray(),
+ "Cannot parse partition specs from non-array: %s", specArray);
+ // default spec ID is required when the spec array is present
+ defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node);
+
+ // parse the spec array
+ ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
+ for (JsonNode spec : specArray) {
+ builder.add(PartitionSpecParser.fromJson(schema, spec));
+ }
+ specs = builder.build();
+
+ } else {
+ // partition spec is required for older readers, but is always set to the default if the spec
+ // array is set. it is only used to default the spec map is missing, indicating that the
+ // table metadata was written by an older writer.
+ defaultSpecId = TableMetadata.INITIAL_SPEC_ID;
+ specs = ImmutableList.of(PartitionSpecParser.fromJsonFields(
+ schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
+ }
+
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);
@@ -177,8 +214,7 @@ public class TableMetadataParser {
}
return new TableMetadata(ops, file, location,
- lastUpdatedMillis, lastAssignedColumnId, schema, spec, properties, currentVersionId,
- snapshots, ImmutableList.copyOf(entries.iterator()));
+ lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
+ currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()));
}
-
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index dd7a3cf..a1e28bc 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -19,7 +19,6 @@
package com.netflix.iceberg;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.ManifestEntry.Status;
@@ -236,16 +235,14 @@ public class TestMergeAppend extends TableTestBase {
1, base.currentSnapshot().manifests().size());
String initialManifest = base.currentSnapshot().manifests().get(0);
- PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+ // build the new spec using the table's schema, which uses fresh IDs
+ PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
.bucket("data", 16)
.bucket("id", 4)
.build();
// commit the new partition spec to the table manually
- TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
- System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
- base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
- table.ops().commit(base, updated);
+ table.ops().commit(base, base.updatePartitionSpec(newSpec));
DataFile newFileC = DataFiles.builder(newSpec)
.copy(FILE_C)
@@ -284,16 +281,14 @@ public class TestMergeAppend extends TableTestBase {
2, base.currentSnapshot().manifests().size());
String manifest = base.currentSnapshot().manifests().get(0);
- PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA)
+ // build the new spec using the table's schema, which uses fresh IDs
+ PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
.bucket("data", 16)
.bucket("id", 4)
.build();
// commit the new partition spec to the table manually
- TableMetadata updated = new TableMetadata(table.ops(), null, base.location(),
- System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(),
- base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of());
- table.ops().commit(base, updated);
+ table.ops().commit(base, base.updatePartitionSpec(newSpec));
DataFile newFileC = DataFiles.builder(newSpec)
.copy(FILE_C)
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 53df5db..21acdbd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -19,19 +19,34 @@
package com.netflix.iceberg;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.netflix.iceberg.TableMetadata.SnapshotLogEntry;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.types.Types;
import com.netflix.iceberg.util.JsonUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
+import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
+import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
+import static com.netflix.iceberg.TableMetadataParser.LAST_UPDATED_MILLIS;
+import static com.netflix.iceberg.TableMetadataParser.LOCATION;
+import static com.netflix.iceberg.TableMetadataParser.PARTITION_SPEC;
+import static com.netflix.iceberg.TableMetadataParser.PROPERTIES;
+import static com.netflix.iceberg.TableMetadataParser.SCHEMA;
+import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
+
public class TestTableMetadataJson {
@Test
public void testJsonConversion() throws Exception {
@@ -41,7 +56,7 @@ public class TestTableMetadataJson {
Types.NestedField.required(3, "z", Types.LongType.get())
);
- PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+ PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
@@ -56,8 +71,9 @@ public class TestTableMetadataJson {
.build();
TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
- currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
+ System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+ ImmutableMap.of("property", "value"), currentSnapshotId,
+ Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
String asJson = TableMetadataParser.toJson(expected);
TableMetadata metadata = TableMetadataParser.fromJson(null, null,
@@ -71,6 +87,10 @@ public class TestTableMetadataJson {
expected.schema().asStruct(), metadata.schema().asStruct());
Assert.assertEquals("Partition spec should match",
expected.spec().toString(), metadata.spec().toString());
+ Assert.assertEquals("Default spec ID should match",
+ expected.defaultSpecId(), metadata.defaultSpecId());
+ Assert.assertEquals("PartitionSpec map should match",
+ expected.specs(), metadata.specs());
Assert.assertEquals("Properties should match",
expected.properties(), metadata.properties());
Assert.assertEquals("Snapshot logs should match",
@@ -96,7 +116,7 @@ public class TestTableMetadataJson {
Types.NestedField.required(3, "z", Types.LongType.get())
);
- PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+ PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
@@ -108,8 +128,9 @@ public class TestTableMetadataJson {
List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"),
- currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
+ System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+ ImmutableMap.of("property", "value"), currentSnapshotId,
+ Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
// add the entries after creating TableMetadata to avoid the sorted check
reversedSnapshotLog.add(
@@ -129,4 +150,108 @@ public class TestTableMetadataJson {
Assert.assertEquals("Snapshot logs should match",
expectedSnapshotLog, metadata.snapshotLog());
}
+
+ @Test
+ public void testBackwardCompatMissingPartitionSpecList() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(1, "x", Types.LongType.get()),
+ Types.NestedField.required(2, "y", Types.LongType.get()),
+ Types.NestedField.required(3, "z", Types.LongType.get())
+ );
+
+ PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build();
+
+ long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
+ Snapshot previousSnapshot = new BaseSnapshot(
+ null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+ long currentSnapshotId = System.currentTimeMillis();
+ Snapshot currentSnapshot = new BaseSnapshot(
+ null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+
+ TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+ System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
+ ImmutableMap.of("property", "value"), currentSnapshotId,
+ Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
+
+ String asJson = toJsonWithoutSpecList(expected);
+ TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+ JsonUtil.mapper().readValue(asJson, JsonNode.class));
+
+ Assert.assertEquals("Table location should match",
+ expected.location(), metadata.location());
+ Assert.assertEquals("Last column ID should match",
+ expected.lastColumnId(), metadata.lastColumnId());
+ Assert.assertEquals("Schema should match",
+ expected.schema().asStruct(), metadata.schema().asStruct());
+ Assert.assertEquals("Partition spec should be the default",
+ expected.spec().toString(), metadata.spec().toString());
+ Assert.assertEquals("Default spec ID should default to TableMetadata.INITIAL_SPEC_ID",
+ TableMetadata.INITIAL_SPEC_ID, metadata.defaultSpecId());
+ Assert.assertEquals("PartitionSpec should contain the spec",
+ 1, metadata.specs().size());
+ Assert.assertTrue("PartitionSpec should contain the spec",
+ metadata.specs().get(0).compatibleWith(spec));
+ Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID",
+ TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId());
+ Assert.assertEquals("Properties should match",
+ expected.properties(), metadata.properties());
+ Assert.assertEquals("Snapshot logs should match",
+ expected.snapshotLog(), metadata.snapshotLog());
+ Assert.assertEquals("Current snapshot ID should match",
+ currentSnapshotId, metadata.currentSnapshot().snapshotId());
+ Assert.assertEquals("Parent snapshot ID should match",
+ (Long) previousSnapshotId, metadata.currentSnapshot().parentId());
+ Assert.assertEquals("Current snapshot files should match",
+ currentSnapshot.manifests(), metadata.currentSnapshot().manifests());
+ Assert.assertEquals("Previous snapshot ID should match",
+ previousSnapshotId, metadata.snapshot(previousSnapshotId).snapshotId());
+ Assert.assertEquals("Previous snapshot files should match",
+ previousSnapshot.manifests(),
+ metadata.snapshot(previousSnapshotId).manifests());
+ }
+
+ public static String toJsonWithoutSpecList(TableMetadata metadata) {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+ generator.writeStartObject(); // start table metadata object
+
+ generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+ generator.writeStringField(LOCATION, metadata.location());
+ generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+ generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
+
+ generator.writeFieldName(SCHEMA);
+ SchemaParser.toJson(metadata.schema(), generator);
+
+ // mimic an old writer by writing only partition-spec and not the default ID or spec list
+ generator.writeFieldName(PARTITION_SPEC);
+ PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+ generator.writeObjectFieldStart(PROPERTIES);
+ for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
+ generator.writeStringField(keyValue.getKey(), keyValue.getValue());
+ }
+ generator.writeEndObject();
+
+ generator.writeNumberField(CURRENT_SNAPSHOT_ID,
+ metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
+
+ generator.writeArrayFieldStart(SNAPSHOTS);
+ for (Snapshot snapshot : metadata.snapshots()) {
+ SnapshotParser.toJson(snapshot, generator);
+ }
+ generator.writeEndArray();
+
+ // skip the snapshot log
+
+ generator.writeEndObject(); // end table metadata object
+
+ generator.flush();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
+ }
+ return writer.toString();
+ }
}