You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/12/06 19:11:07 UTC
[iceberg] branch master updated: Core: Add table metadata builder (#3664)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6d3b1f7 Core: Add table metadata builder (#3664)
6d3b1f7 is described below
commit 6d3b1f7996d4ceee5d5e28df7f378f6fe377950e
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon Dec 6 11:10:51 2021 -0800
Core: Add table metadata builder (#3664)
---
.../java/org/apache/iceberg/PartitionSpec.java | 6 +-
.../main/java/org/apache/iceberg/SortOrder.java | 15 +-
.../java/org/apache/iceberg/BaseTransaction.java | 13 +-
.../java/org/apache/iceberg/MetadataUpdate.java | 203 ++++
.../java/org/apache/iceberg/TableMetadata.java | 1002 ++++++++++++--------
.../org/apache/iceberg/TableMetadataParser.java | 2 +-
.../java/org/apache/iceberg/TestTableMetadata.java | 30 +-
.../test/java/org/apache/iceberg/TestTables.java | 5 +-
.../iceberg/nessie/NessieTableOperations.java | 15 +-
9 files changed, 860 insertions(+), 431 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 51bc851..6ad90fd 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -499,10 +499,14 @@ public class PartitionSpec implements Serializable {
}
public PartitionSpec build() {
- PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get());
+ PartitionSpec spec = buildUnchecked();
checkCompatibility(spec, schema);
return spec;
}
+
+ PartitionSpec buildUnchecked() {
+ return new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get());
+ }
}
static void checkCompatibility(PartitionSpec spec, Schema schema) {
diff --git a/api/src/main/java/org/apache/iceberg/SortOrder.java b/api/src/main/java/org/apache/iceberg/SortOrder.java
index a217e39..595af6d 100644
--- a/api/src/main/java/org/apache/iceberg/SortOrder.java
+++ b/api/src/main/java/org/apache/iceberg/SortOrder.java
@@ -263,7 +263,18 @@ public class SortOrder implements Serializable {
return this;
}
+ Builder addSortField(Transform<?, ?> transform, int sourceId, SortDirection direction, NullOrder nullOrder) {
+ fields.add(new SortField(transform, sourceId, direction, nullOrder));
+ return this;
+ }
+
public SortOrder build() {
+ SortOrder sortOrder = buildUnchecked();
+ checkCompatibility(sortOrder, schema);
+ return sortOrder;
+ }
+
+ SortOrder buildUnchecked() {
if (fields.isEmpty()) {
if (orderId != null && orderId != 0) {
throw new IllegalArgumentException("Unsorted order ID must be 0");
@@ -277,9 +288,7 @@ public class SortOrder implements Serializable {
// default ID to 1 as 0 is reserved for unsorted order
int actualOrderId = orderId != null ? orderId : 1;
- SortOrder sortOrder = new SortOrder(schema, actualOrderId, fields);
- checkCompatibility(sortOrder, schema);
- return sortOrder;
+ return new SortOrder(schema, actualOrderId, fields);
}
private Transform<?, ?> toTransform(BoundTerm<?> term) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 3d0b31e..7e779bc 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -240,13 +240,10 @@ class BaseTransaction implements Transaction {
}
private void commitCreateTransaction() {
- // fix up the snapshot log, which should not contain intermediate snapshots
- TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
// this operation creates the table. if the commit fails, this cannot retry because another
// process has created the same table.
try {
- ops.commit(null, createMetadata);
+ ops.commit(null, current);
} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
@@ -271,9 +268,7 @@ class BaseTransaction implements Transaction {
}
private void commitReplaceTransaction(boolean orCreate) {
- // fix up the snapshot log, which should not contain intermediate snapshots
- TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
- Map<String, String> props = base != null ? base.properties() : replaceMetadata.properties();
+ Map<String, String> props = base != null ? base.properties() : current.properties();
try {
Tasks.foreach(ops)
@@ -300,7 +295,7 @@ class BaseTransaction implements Transaction {
this.base = underlyingOps.current(); // just refreshed
}
- underlyingOps.commit(base, replaceMetadata);
+ underlyingOps.commit(base, current);
});
} catch (RuntimeException e) {
@@ -358,7 +353,7 @@ class BaseTransaction implements Transaction {
}
// fix up the snapshot log, which should not contain intermediate snapshots
- underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+ underlyingOps.commit(base, current);
});
} catch (RuntimeException e) {
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
new file mode 100644
index 0000000..25d015a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents a change to table metadata.
+ */
+public interface MetadataUpdate extends Serializable {
+ class AssignUUID implements MetadataUpdate {
+ private final String uuid;
+
+ public AssignUUID(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String uuid() {
+ return uuid;
+ }
+ }
+
+ class UpgradeFormatVersion implements MetadataUpdate {
+ private final int formatVersion;
+
+ public UpgradeFormatVersion(int formatVersion) {
+ this.formatVersion = formatVersion;
+ }
+
+ public int formatVersion() {
+ return formatVersion;
+ }
+ }
+
+ class AddSchema implements MetadataUpdate {
+ private final Schema schema;
+ private final int lastColumnId;
+
+ public AddSchema(Schema schema, int lastColumnId) {
+ this.schema = schema;
+ this.lastColumnId = lastColumnId;
+ }
+
+ public Schema schema() {
+ return schema;
+ }
+
+ public int lastColumnId() {
+ return lastColumnId;
+ }
+ }
+
+ class SetCurrentSchema implements MetadataUpdate {
+ private final int schemaId;
+
+ public SetCurrentSchema(int schemaId) {
+ this.schemaId = schemaId;
+ }
+
+ public int schemaId() {
+ return schemaId;
+ }
+ }
+
+ class AddPartitionSpec implements MetadataUpdate {
+ private final PartitionSpec spec;
+
+ public AddPartitionSpec(PartitionSpec spec) {
+ this.spec = spec;
+ }
+
+ public PartitionSpec spec() {
+ return spec;
+ }
+ }
+
+ class SetDefaultPartitionSpec implements MetadataUpdate {
+ private final int specId;
+
+ public SetDefaultPartitionSpec(int schemaId) {
+ this.specId = schemaId;
+ }
+
+ public int specId() {
+ return specId;
+ }
+ }
+
+ class AddSortOrder implements MetadataUpdate {
+ private final SortOrder sortOrder;
+
+ public AddSortOrder(SortOrder sortOrder) {
+ this.sortOrder = sortOrder;
+ }
+
+ public SortOrder spec() {
+ return sortOrder;
+ }
+ }
+
+ class SetDefaultSortOrder implements MetadataUpdate {
+ private final int sortOrderId;
+
+ public SetDefaultSortOrder(int sortOrderId) {
+ this.sortOrderId = sortOrderId;
+ }
+
+ public int sortOrderId() {
+ return sortOrderId;
+ }
+ }
+
+ class AddSnapshot implements MetadataUpdate {
+ private final Snapshot snapshot;
+
+ public AddSnapshot(Snapshot snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ public Snapshot snapshot() {
+ return snapshot;
+ }
+ }
+
+ class RemoveSnapshot implements MetadataUpdate {
+ private final long snapshotId;
+
+ public RemoveSnapshot(long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+ }
+
+ class SetCurrentSnapshot implements MetadataUpdate {
+ private final Long snapshotId;
+
+ public SetCurrentSnapshot(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ public Long snapshotId() {
+ return snapshotId;
+ }
+ }
+
+ class SetProperties implements MetadataUpdate {
+ private final Map<String, String> updated;
+
+ public SetProperties(Map<String, String> updated) {
+ this.updated = updated;
+ }
+
+ public Map<String, String> updated() {
+ return updated;
+ }
+ }
+
+ class RemoveProperties implements MetadataUpdate {
+ private final Set<String> removed;
+
+ public RemoveProperties(Set<String> removed) {
+ this.removed = removed;
+ }
+
+ public Set<String> removed() {
+ return removed;
+ }
+ }
+
+ class SetLocation implements MetadataUpdate {
+ private final String location;
+
+ public SetLocation(String location) {
+ this.location = location;
+ }
+
+ public String location() {
+ return location;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 59c71d7..164e295 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -22,8 +22,6 @@ package org.apache.iceberg;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -126,7 +124,7 @@ public class TableMetadata implements Serializable {
freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
freshSortOrderId, ImmutableList.of(freshSortOrder),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
- ImmutableList.of(), ImmutableList.of());
+ ImmutableList.of(), ImmutableList.of(), ImmutableList.of());
}
public static class SnapshotLogEntry implements HistoryEntry {
@@ -240,6 +238,7 @@ public class TableMetadata implements Serializable {
private final Map<Integer, SortOrder> sortOrdersById;
private final List<HistoryEntry> snapshotLog;
private final List<MetadataLogEntry> previousFiles;
+ private final List<MetadataUpdate> changes;
@SuppressWarnings("checkstyle:CyclomaticComplexity")
TableMetadata(String metadataFileLocation,
@@ -260,7 +259,8 @@ public class TableMetadata implements Serializable {
long currentSnapshotId,
List<Snapshot> snapshots,
List<HistoryEntry> snapshotLog,
- List<MetadataLogEntry> previousFiles) {
+ List<MetadataLogEntry> previousFiles,
+ List<MetadataUpdate> changes) {
Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(), "Sort orders cannot be null or empty");
Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
@@ -269,6 +269,8 @@ public class TableMetadata implements Serializable {
"UUID is required in format v%s", formatVersion);
Preconditions.checkArgument(formatVersion > 1 || lastSequenceNumber == 0,
"Sequence number must be 0 in v1: %s", lastSequenceNumber);
+ Preconditions.checkArgument(metadataFileLocation == null || changes.isEmpty(),
+ "Cannot create TableMetadata with a metadata location and changes");
this.metadataFileLocation = metadataFileLocation;
this.formatVersion = formatVersion;
@@ -290,6 +292,9 @@ public class TableMetadata implements Serializable {
this.snapshotLog = snapshotLog;
this.previousFiles = previousFiles;
+ // changes are carried through until metadata is read from a file
+ this.changes = changes;
+
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
this.schemasById = indexSchemas();
this.specsById = indexSpecs(specs);
@@ -467,308 +472,61 @@ public class TableMetadata implements Serializable {
return previousFiles;
}
+ public List<MetadataUpdate> changes() {
+ return changes;
+ }
+
public TableMetadata withUUID() {
- if (uuid != null) {
- return this;
- } else {
- return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
- lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties,
- currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
+ return new Builder(this).assignUUID().build();
}
public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
- PartitionSpec.checkCompatibility(spec(), newSchema);
- SortOrder.checkCompatibility(sortOrder(), newSchema);
- // rebuild all of the partition specs and sort orders for the new current schema
- List<PartitionSpec> updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec));
- List<SortOrder> updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
-
- int newSchemaId = reuseOrCreateNewSchemaId(newSchema);
- if (currentSchemaId == newSchemaId && newLastColumnId == lastColumnId) {
- // the new spec and last column Id is already current and no change is needed
- return this;
- }
-
- ImmutableList.Builder<Schema> builder = ImmutableList.<Schema>builder().addAll(schemas);
- if (!schemasById.containsKey(newSchemaId)) {
- builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds()));
- }
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), newLastColumnId,
- newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId,
- defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
- addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).setCurrentSchema(newSchema, newLastColumnId).build();
}
// The caller is responsible to pass a newPartitionSpec with correct partition field IDs
public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
- Schema schema = schema();
-
- PartitionSpec.checkCompatibility(newPartitionSpec, schema);
- ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(newPartitionSpec),
- "Spec does not use sequential IDs that are required in v1: %s", newPartitionSpec);
-
- // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
- int newDefaultSpecId = INITIAL_SPEC_ID;
- for (PartitionSpec spec : specs) {
- if (newPartitionSpec.compatibleWith(spec)) {
- newDefaultSpecId = spec.specId();
- break;
- } else if (newDefaultSpecId <= spec.specId()) {
- newDefaultSpecId = spec.specId() + 1;
- }
- }
-
- if (defaultSpecId == newDefaultSpecId) {
- // the new spec is already current and no change is needed
- return this;
- }
-
- ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
- .addAll(specs);
- if (!specsById.containsKey(newDefaultSpecId)) {
- // get a fresh spec to ensure the spec ID is set to the new default
- builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
- }
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId,
- builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()),
- defaultSortOrderId, sortOrders, properties,
- currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build();
}
public TableMetadata replaceSortOrder(SortOrder newOrder) {
- Schema schema = schema();
- SortOrder.checkCompatibility(newOrder, schema);
-
- // determine the next order id
- int newOrderId = INITIAL_SORT_ORDER_ID;
- for (SortOrder order : sortOrders) {
- if (order.sameOrder(newOrder)) {
- newOrderId = order.orderId();
- break;
- } else if (newOrderId <= order.orderId()) {
- newOrderId = order.orderId() + 1;
- }
- }
-
- if (newOrderId == defaultSortOrderId) {
- return this;
- }
-
- ImmutableList.Builder<SortOrder> builder = ImmutableList.builder();
- builder.addAll(sortOrders);
-
- if (!sortOrdersById.containsKey(newOrderId)) {
- if (newOrder.isUnsorted()) {
- newOrderId = SortOrder.unsorted().orderId();
- builder.add(SortOrder.unsorted());
- } else {
- // rebuild the sort order using new column ids
- builder.add(freshSortOrder(newOrderId, schema, newOrder));
- }
- }
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
- addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).setDefaultSortOrder(newOrder).build();
}
public TableMetadata addStagedSnapshot(Snapshot snapshot) {
- ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
- "Cannot add snapshot with sequence number %s older than last sequence number %s",
- snapshot.sequenceNumber(), lastSequenceNumber);
-
- List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
- .addAll(snapshots)
- .add(snapshot)
- .build();
-
- return new TableMetadata(null, formatVersion, uuid, location,
- snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
- currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
- defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog,
- addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).addSnapshot(snapshot).build();
}
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
- // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
- if (snapshotsById.containsKey(snapshot.snapshotId())) {
- return setCurrentSnapshotTo(snapshot);
- }
-
- ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
- "Cannot add snapshot with sequence number %s older than last sequence number %s",
- snapshot.sequenceNumber(), lastSequenceNumber);
-
- List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
- .addAll(snapshots)
- .add(snapshot)
- .build();
- List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
- .addAll(snapshotLog)
- .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
- .build();
-
- return new TableMetadata(null, formatVersion, uuid, location,
- snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
- currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
- defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog,
- addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).setCurrentSnapshot(snapshot).build();
}
public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
- List<Snapshot> filtered = Lists.newArrayListWithExpectedSize(snapshots.size());
- for (Snapshot snapshot : snapshots) {
- // keep the current snapshot and any snapshots that do not match the removeIf condition
- if (snapshot.snapshotId() == currentSnapshotId || !removeIf.test(snapshot)) {
- filtered.add(snapshot);
- }
- }
-
- // update the snapshot log
- Set<Long> validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId));
- List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
- for (HistoryEntry logEntry : snapshotLog) {
- if (validIds.contains(logEntry.snapshotId())) {
- // copy the log entries that are still valid
- newSnapshotLog.add(logEntry);
- } else {
- // any invalid entry causes the history before it to be removed. otherwise, there could be
- // history gaps that cause time-travel queries to produce incorrect results. for example,
- // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
- // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
- // and t3 when in fact s2 was the current snapshot.
- newSnapshotLog.clear();
- }
- }
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
- ImmutableList.copyOf(newSnapshotLog), addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
-
- private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
- ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
- "Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
- ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
- "Last sequence number %s is less than existing snapshot sequence number %s",
- lastSequenceNumber, snapshot.sequenceNumber());
-
- if (currentSnapshotId == snapshot.snapshotId()) {
- // change is a noop
- return this;
- }
-
- long nowMillis = System.currentTimeMillis();
- List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
- .addAll(snapshotLog)
- .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
- .build();
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots,
- newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ List<Snapshot> toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList());
+ return new Builder(this).removeSnapshots(toRemove).build();
}
public TableMetadata replaceProperties(Map<String, String> rawProperties) {
ValidationException.check(rawProperties != null, "Cannot set properties to null");
Map<String, String> newProperties = unreservedProperties(rawProperties);
- TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
- snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties));
- int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
- if (formatVersion != newFormatVersion) {
- metadata = metadata.upgradeToFormatVersion(newFormatVersion);
- }
-
- return metadata;
- }
-
- public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
- List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
- for (HistoryEntry logEntry : snapshotLog) {
- if (!snapshotIds.contains(logEntry.snapshotId())) {
- // copy the log entries that are still valid
- newSnapshotLog.add(logEntry);
+ Set<String> removed = Sets.newHashSet(properties.keySet());
+ Map<String, String> updated = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+ removed.remove(entry.getKey());
+ String current = properties.get(entry.getKey());
+ if (current == null || !current.equals(entry.getValue())) {
+ updated.put(entry.getKey(), entry.getValue());
}
}
- ValidationException.check(currentSnapshotId < 0 || // not set
- Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
- "Cannot set invalid snapshot log: latest entry is not the current snapshot");
-
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
-
- /**
- * Returns an updated {@link TableMetadata} with the current-snapshot-ID set to the given
- * snapshot-ID and the snapshot-log reset to contain only the snapshot with the given snapshot-ID.
- *
- * @param snapshotId ID of a snapshot that must exist, or {@code -1L} to remove the current snapshot
- * and return an empty snapshot log.
- * @return {@link TableMetadata} with updated {@link #currentSnapshotId} and {@link #snapshotLog}
- */
- public TableMetadata withCurrentSnapshotOnly(long snapshotId) {
- if ((currentSnapshotId == -1L && snapshotId == -1L && snapshots.isEmpty()) ||
- (currentSnapshotId == snapshotId && snapshots.size() == 1)) {
- return this;
- }
- List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
- if (snapshotId != -1L) {
- Snapshot snapshot = snapshotsById.get(snapshotId);
- Preconditions.checkArgument(snapshot != null, "Non-existent snapshot");
- newSnapshotLog.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshotId));
- }
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshotId,
- snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
-
- public TableMetadata withCurrentSchema(int schemaId) {
- if (currentSchemaId == schemaId) {
- return this;
- }
- Preconditions.checkArgument(schemasById.containsKey(schemaId), "Non-existent schema");
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
-
- public TableMetadata withDefaultSortOrder(int sortOrderId) {
- if (defaultSortOrderId == sortOrderId) {
- return this;
- }
- Preconditions.checkArgument(sortOrdersById.containsKey(sortOrderId), "Non-existent sort-order");
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, sortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
+ int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
- public TableMetadata withDefaultSpec(int specId) {
- if (defaultSpecId == specId) {
- return this;
- }
- Preconditions.checkArgument(specsById.containsKey(specId), "Non-existent partition spec");
- return new TableMetadata(null, formatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, specId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this)
+ .setProperties(updated)
+ .removeProperties(removed)
+ .upgradeFormatVersion(newFormatVersion)
+ .build();
}
private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
@@ -835,146 +593,62 @@ public class TableMetadata implements Serializable {
AtomicInteger newLastColumnId = new AtomicInteger(lastColumnId);
Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, schema(), newLastColumnId::incrementAndGet);
- // determine the next spec id
- OptionalInt maxSpecId = specs.stream().mapToInt(PartitionSpec::specId).max();
- int nextSpecId = maxSpecId.orElse(TableMetadata.INITIAL_SPEC_ID) + 1;
-
- // rebuild the partition spec using the new column ids
- PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec);
-
- // reassign partition field ids with existing partition specs in the table
- AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId);
- PartitionSpec newSpec = reassignPartitionIds(freshSpec, lastPartitionId::incrementAndGet);
-
- // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
- int specId = specs.stream()
- .filter(newSpec::compatibleWith)
- .findFirst()
- .map(PartitionSpec::specId)
- .orElse(nextSpecId);
-
- ImmutableList.Builder<PartitionSpec> specListBuilder = ImmutableList.<PartitionSpec>builder().addAll(specs);
- if (!specsById.containsKey(specId)) {
- specListBuilder.add(newSpec);
- }
-
- // determine the next order id
- OptionalInt maxOrderId = sortOrders.stream().mapToInt(SortOrder::orderId).max();
- int nextOrderId = maxOrderId.isPresent() ? maxOrderId.getAsInt() + 1 : INITIAL_SORT_ORDER_ID;
+ // rebuild the partition spec using the new column ids and reassign partition field ids to align with existing
+ // partition specs in the table
+ PartitionSpec freshSpec = reassignPartitionIds(
+ freshSpec(INITIAL_SPEC_ID, freshSchema, updatedPartitionSpec),
+ new AtomicInteger(lastAssignedPartitionId)::incrementAndGet);
// rebuild the sort order using new column ids
- int freshSortOrderId = updatedSortOrder.isUnsorted() ? updatedSortOrder.orderId() : nextOrderId;
- SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, updatedSortOrder);
-
- // if the order already exists, use the same ID. otherwise, use the fresh order ID
- Optional<SortOrder> sameSortOrder = sortOrders.stream()
- .filter(sortOrder -> sortOrder.sameOrder(freshSortOrder))
- .findAny();
- int orderId = sameSortOrder.map(SortOrder::orderId).orElse(freshSortOrderId);
-
- ImmutableList.Builder<SortOrder> sortOrdersBuilder = ImmutableList.<SortOrder>builder().addAll(sortOrders);
- if (!sortOrdersById.containsKey(orderId)) {
- sortOrdersBuilder.add(freshSortOrder);
- }
-
- Map<String, String> newProperties = Maps.newHashMap();
- newProperties.putAll(this.properties);
- newProperties.putAll(unreservedProperties(updatedProperties));
+ SortOrder freshSortOrder = freshSortOrder(INITIAL_SORT_ORDER_ID, freshSchema, updatedSortOrder);
// check if there is format version override
int newFormatVersion = PropertyUtil.propertyAsInt(updatedProperties, TableProperties.FORMAT_VERSION, formatVersion);
- // determine the next schema id
- int freshSchemaId = reuseOrCreateNewSchemaId(freshSchema);
- ImmutableList.Builder<Schema> schemasBuilder = ImmutableList.<Schema>builder().addAll(schemas);
-
- if (!schemasById.containsKey(freshSchemaId)) {
- schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds()));
- }
-
- TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation,
- lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(),
- specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()),
- orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
- -1, snapshots, ImmutableList.of(), addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties));
-
- if (formatVersion != newFormatVersion) {
- metadata = metadata.upgradeToFormatVersion(newFormatVersion);
- }
-
- return metadata;
+ return new Builder(this)
+ .upgradeFormatVersion(newFormatVersion)
+ .setCurrentSnapshot(null)
+ .setCurrentSchema(freshSchema, newLastColumnId.get())
+ .setDefaultPartitionSpec(freshSpec)
+ .setDefaultSortOrder(freshSortOrder)
+ .setLocation(newLocation)
+ .setProperties(unreservedProperties(updatedProperties))
+ .build();
}
public TableMetadata updateLocation(String newLocation) {
- return new TableMetadata(null, formatVersion, uuid, newLocation,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+ return new Builder(this).setLocation(newLocation).build();
}
public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
- Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
- "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
- newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
- Preconditions.checkArgument(newFormatVersion >= formatVersion,
- "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
-
- if (newFormatVersion == formatVersion) {
- return this;
- }
-
- return new TableMetadata(null, newFormatVersion, uuid, location,
- lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
- }
-
- private List<MetadataLogEntry> addPreviousFile(String previousFileLocation, long timestampMillis) {
- return addPreviousFile(previousFileLocation, timestampMillis, properties);
- }
-
- private List<MetadataLogEntry> addPreviousFile(String previousFileLocation, long timestampMillis,
- Map<String, String> updatedProperties) {
- if (previousFileLocation == null) {
- return previousFiles;
- }
-
- int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
- TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
-
- List<MetadataLogEntry> newMetadataLog;
- if (previousFiles.size() >= maxSize) {
- int removeIndex = previousFiles.size() - maxSize + 1;
- newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
- } else {
- newMetadataLog = Lists.newArrayList(previousFiles);
- }
- newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation));
-
- return newMetadataLog;
+ return new Builder(this).upgradeFormatVersion(newFormatVersion).build();
}
private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
.withSpecId(partitionSpec.specId());
- // add all of the fields to the builder. IDs should not change.
+ // add all the fields to the builder. IDs should not change.
for (PartitionField field : partitionSpec.fields()) {
specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform());
}
- return specBuilder.build();
+ // build without validation because the schema may have changed in a way that makes this spec invalid. the spec
+ // should still be preserved so that older metadata can be interpreted.
+ return specBuilder.buildUnchecked();
}
private static SortOrder updateSortOrderSchema(Schema schema, SortOrder sortOrder) {
SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(sortOrder.orderId());
- // add all of the fields to the builder. IDs should not change.
+ // add all the fields to the builder. IDs should not change.
for (SortField field : sortOrder.fields()) {
- builder.addSortField(field.transform().toString(), field.sourceId(), field.direction(), field.nullOrder());
+ builder.addSortField(field.transform(), field.sourceId(), field.direction(), field.nullOrder());
}
- return builder.build();
+ // build without validation because the schema may have changed in a way that makes this order invalid. the order
+ // should still be preserved so that older metadata can be interpreted.
+ return builder.buildUnchecked();
}
private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
@@ -995,7 +669,11 @@ public class TableMetadata implements Serializable {
}
private static SortOrder freshSortOrder(int orderId, Schema schema, SortOrder sortOrder) {
- SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(orderId);
+ SortOrder.Builder builder = SortOrder.builderFor(schema);
+
+ if (sortOrder.isSorted()) {
+ builder.withOrderId(orderId);
+ }
for (SortField field : sortOrder.fields()) {
// look up the name of the source field in the old schema to get the new schema's id
@@ -1047,17 +725,547 @@ public class TableMetadata implements Serializable {
return builder.build();
}
- private int reuseOrCreateNewSchemaId(Schema newSchema) {
- // if the schema already exists, use its id; otherwise use the highest id + 1
- int newSchemaId = currentSchemaId;
- for (Schema schema : schemas) {
- if (schema.sameSchema(newSchema)) {
- newSchemaId = schema.schemaId();
- break;
- } else if (schema.schemaId() >= newSchemaId) {
- newSchemaId = schema.schemaId() + 1;
+ public static Builder buildFrom(TableMetadata base) {
+ return new Builder(base);
+ }
+
+ public static class Builder {
+ private final TableMetadata base;
+ private int formatVersion;
+ private String uuid;
+ private Long lastUpdatedMillis;
+ private String location;
+ private long lastSequenceNumber;
+ private int lastColumnId;
+ private int currentSchemaId;
+ private final List<Schema> schemas;
+ private int defaultSpecId;
+ private List<PartitionSpec> specs;
+ private int lastAssignedPartitionId;
+ private int defaultSortOrderId;
+ private List<SortOrder> sortOrders;
+ private final Map<String, String> properties;
+ private long currentSnapshotId;
+ private List<Snapshot> snapshots;
+
+ // change tracking
+ private final List<MetadataUpdate> changes;
+ private final int startingChangeCount;
+ private boolean discardChanges = false;
+
+ // handled in build
+ private final List<HistoryEntry> snapshotLog;
+ private final String previousFileLocation;
+ private final List<MetadataLogEntry> previousFiles;
+
+ // indexes for convenience
+ private final Map<Long, Snapshot> snapshotsById;
+ private final Map<Integer, Schema> schemasById;
+ private final Map<Integer, PartitionSpec> specsById;
+ private final Map<Integer, SortOrder> sortOrdersById;
+
+ private Builder(TableMetadata base) {
+ this.base = base;
+ this.formatVersion = base.formatVersion;
+ this.uuid = base.uuid;
+ this.lastUpdatedMillis = null;
+ this.location = base.location;
+ this.lastSequenceNumber = base.lastSequenceNumber;
+ this.lastColumnId = base.lastColumnId;
+ this.currentSchemaId = base.currentSchemaId;
+ this.schemas = Lists.newArrayList(base.schemas);
+ this.defaultSpecId = base.defaultSpecId;
+ this.specs = Lists.newArrayList(base.specs);
+ this.lastAssignedPartitionId = base.lastAssignedPartitionId;
+ this.defaultSortOrderId = base.defaultSortOrderId;
+ this.sortOrders = Lists.newArrayList(base.sortOrders);
+ this.properties = Maps.newHashMap(base.properties);
+ this.currentSnapshotId = base.currentSnapshotId;
+ this.snapshots = Lists.newArrayList(base.snapshots);
+ this.changes = Lists.newArrayList(base.changes);
+ this.startingChangeCount = changes.size();
+
+ this.snapshotLog = Lists.newArrayList(base.snapshotLog);
+ this.previousFileLocation = base.metadataFileLocation;
+ this.previousFiles = base.previousFiles;
+
+ this.snapshotsById = Maps.newHashMap(base.snapshotsById);
+ this.schemasById = Maps.newHashMap(base.schemasById);
+ this.specsById = Maps.newHashMap(base.specsById);
+ this.sortOrdersById = Maps.newHashMap(base.sortOrdersById);
+ }
+
+ public Builder assignUUID() {
+ if (uuid == null) {
+ this.uuid = UUID.randomUUID().toString();
+ changes.add(new MetadataUpdate.AssignUUID(uuid));
}
+
+ return this;
+ }
+
+ public Builder upgradeFormatVersion(int newFormatVersion) {
+ Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+ "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
+ newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
+ Preconditions.checkArgument(newFormatVersion >= formatVersion,
+ "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
+
+ if (newFormatVersion == formatVersion) {
+ return this;
+ }
+
+ this.formatVersion = newFormatVersion;
+ changes.add(new MetadataUpdate.UpgradeFormatVersion(newFormatVersion));
+
+ return this;
+ }
+
+ public Builder setCurrentSchema(Schema newSchema, int newLastColumnId) {
+ setCurrentSchema(addSchemaInternal(newSchema, newLastColumnId));
+ return this;
+ }
+
+ public Builder setCurrentSchema(int schemaId) {
+ if (currentSchemaId == schemaId) {
+ return this;
+ }
+
+ Schema schema = schemasById.get(schemaId);
+ Preconditions.checkArgument(schema != null, "Cannot set current schema to unknown schema: %s", schemaId);
+
+ // rebuild all the partition specs and sort orders for the new current schema
+ this.specs = Lists.newArrayList(Iterables.transform(specs,
+ spec -> updateSpecSchema(schema, spec)));
+ specsById.clear();
+ specsById.putAll(indexSpecs(specs));
+
+ this.sortOrders = Lists.newArrayList(Iterables.transform(sortOrders,
+ order -> updateSortOrderSchema(schema, order)));
+ sortOrdersById.clear();
+ sortOrdersById.putAll(indexSortOrders(sortOrders));
+
+ this.currentSchemaId = schemaId;
+
+ changes.add(new MetadataUpdate.SetCurrentSchema(schemaId));
+
+ return this;
+ }
+
+ public Builder addSchema(Schema schema, int newLastColumnId) {
+ addSchemaInternal(schema, newLastColumnId);
+ return this;
+ }
+
+ public Builder setDefaultPartitionSpec(PartitionSpec spec) {
+ setDefaultPartitionSpec(addPartitionSpecInternal(spec));
+ return this;
+ }
+
+ public Builder setDefaultPartitionSpec(int specId) {
+ if (defaultSpecId == specId) {
+ // the new spec is already current and no change is needed
+ return this;
+ }
+
+ this.defaultSpecId = specId;
+ changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId));
+
+ return this;
+ }
+
+ public Builder addPartitionSpec(PartitionSpec spec) {
+ addPartitionSpecInternal(spec);
+ return this;
+ }
+
+ public Builder setDefaultSortOrder(SortOrder order) {
+ setDefaultSortOrder(addSortOrderInternal(order));
+ return this;
+ }
+
+ public Builder setDefaultSortOrder(int sortOrderId) {
+ if (sortOrderId == defaultSortOrderId) {
+ return this;
+ }
+
+ this.defaultSortOrderId = sortOrderId;
+ changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId));
+
+ return this;
+ }
+
+ public Builder addSortOrder(SortOrder order) {
+ addSortOrderInternal(order);
+ return this;
+ }
+
+ public Builder addSnapshot(Snapshot snapshot) {
+ if (snapshot == null || snapshotsById.containsKey(snapshot.snapshotId())) {
+ // change is a noop
+ return this;
+ }
+
+ ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+ "Cannot add snapshot with sequence number %s older than last sequence number %s",
+ snapshot.sequenceNumber(), lastSequenceNumber);
+
+ this.lastUpdatedMillis = snapshot.timestampMillis();
+ this.lastSequenceNumber = snapshot.sequenceNumber();
+ snapshots.add(snapshot);
+ snapshotsById.put(snapshot.snapshotId(), snapshot);
+ changes.add(new MetadataUpdate.AddSnapshot(snapshot));
+
+ return this;
+ }
+
+ public Builder setCurrentSnapshot(Snapshot snapshot) {
+ addSnapshot(snapshot);
+ setCurrentSnapshot(snapshot, null);
+ return this;
+ }
+
+ public Builder setCurrentSnapshot(long snapshotId) {
+ if (currentSnapshotId == snapshotId) {
+ // change is a noop
+ return this;
+ }
+
+ Snapshot snapshot = snapshotsById.get(snapshotId);
+ ValidationException.check(snapshot != null,
+ "Cannot set current snapshot to unknown: %s", snapshotId);
+
+ setCurrentSnapshot(snapshot, System.currentTimeMillis());
+
+ return this;
+ }
+
+ public Builder removeSnapshots(List<Snapshot> snapshotsToRemove) {
+ Set<Long> idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+ List<Snapshot> retainedSnapshots = Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size());
+ for (Snapshot snapshot : snapshots) {
+ long snapshotId = snapshot.snapshotId();
+ if (idsToRemove.contains(snapshotId)) {
+ snapshotsById.remove(snapshotId);
+ changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
+ } else {
+ retainedSnapshots.add(snapshot);
+ }
+ }
+
+ this.snapshots = retainedSnapshots;
+ if (!snapshotsById.containsKey(currentSnapshotId)) {
+ setCurrentSnapshot(null, System.currentTimeMillis());
+ }
+
+ return this;
+ }
+
+ public Builder setProperties(Map<String, String> updated) {
+ if (updated.isEmpty()) {
+ return this;
+ }
+
+ properties.putAll(updated);
+ changes.add(new MetadataUpdate.SetProperties(updated));
+
+ return this;
+ }
+
+ public Builder removeProperties(Set<String> removed) {
+ if (removed.isEmpty()) {
+ return this;
+ }
+
+ removed.forEach(properties::remove);
+ changes.add(new MetadataUpdate.RemoveProperties(removed));
+
+ return this;
+ }
+
+ public Builder setLocation(String newLocation) {
+ if (location != null && location.equals(newLocation)) {
+ return this;
+ }
+
+ this.location = newLocation;
+ changes.add(new MetadataUpdate.SetLocation(newLocation));
+
+ return this;
+ }
+
+ public Builder discardChanges() {
+ this.discardChanges = true;
+ return this;
+ }
+
+ public TableMetadata build() {
+ if (changes.size() == startingChangeCount && !(discardChanges && changes.size() > 0)) {
+ return base;
+ }
+
+ if (lastUpdatedMillis == null) {
+ this.lastUpdatedMillis = System.currentTimeMillis();
+ }
+
+ Schema schema = schemasById.get(currentSchemaId);
+ PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema);
+ SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema);
+
+ List<MetadataLogEntry> metadataHistory = addPreviousFile(
+ previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+ List<HistoryEntry> newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes);
+
+ return new TableMetadata(
+ null,
+ formatVersion,
+ uuid,
+ location,
+ lastSequenceNumber,
+ lastUpdatedMillis,
+ lastColumnId,
+ currentSchemaId,
+ ImmutableList.copyOf(schemas),
+ defaultSpecId,
+ ImmutableList.copyOf(specs),
+ lastAssignedPartitionId,
+ defaultSortOrderId,
+ ImmutableList.copyOf(sortOrders),
+ ImmutableMap.copyOf(properties),
+ currentSnapshotId,
+ ImmutableList.copyOf(snapshots),
+ ImmutableList.copyOf(newSnapshotLog),
+ ImmutableList.copyOf(metadataHistory),
+ discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)
+ );
+ }
+
+ private int addSchemaInternal(Schema schema, int newLastColumnId) {
+ Preconditions.checkArgument(newLastColumnId >= lastColumnId,
+ "Invalid last column ID: %s < %s (previous last column ID)", newLastColumnId, lastColumnId);
+
+ int newSchemaId = reuseOrCreateNewSchemaId(schema);
+ boolean schemaFound = schemasById.containsKey(newSchemaId);
+ if (schemaFound && newLastColumnId == lastColumnId) {
+ // the new spec and last column id is already current and no change is needed
+ return newSchemaId;
+ }
+
+ this.lastColumnId = newLastColumnId;
+
+ Schema newSchema;
+ if (newSchemaId != schema.schemaId()) {
+ newSchema = new Schema(newSchemaId, schema.columns(), schema.identifierFieldIds());
+ } else {
+ newSchema = schema;
+ }
+
+ if (!schemaFound) {
+ schemas.add(newSchema);
+ schemasById.put(newSchema.schemaId(), newSchema);
+ }
+
+ changes.add(new MetadataUpdate.AddSchema(newSchema, lastColumnId));
+
+ return newSchemaId;
+ }
+
+ private int reuseOrCreateNewSchemaId(Schema newSchema) {
+ // if the schema already exists, use its id; otherwise use the highest id + 1
+ int newSchemaId = currentSchemaId;
+ for (Schema schema : schemas) {
+ if (schema.sameSchema(newSchema)) {
+ return schema.schemaId();
+ } else if (schema.schemaId() >= newSchemaId) {
+ newSchemaId = schema.schemaId() + 1;
+ }
+ }
+ return newSchemaId;
+ }
+
+ private int addPartitionSpecInternal(PartitionSpec spec) {
+ int newSpecId = reuseOrCreateNewSpecId(spec);
+ if (specsById.containsKey(newSpecId)) {
+ return newSpecId;
+ }
+
+ Schema schema = schemasById.get(currentSchemaId);
+ PartitionSpec.checkCompatibility(spec, schema);
+ ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(spec),
+ "Spec does not use sequential IDs that are required in v1: %s", spec);
+
+ PartitionSpec newSpec = freshSpec(newSpecId, schema, spec);
+ this.lastAssignedPartitionId = Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId());
+ specs.add(newSpec);
+ specsById.put(newSpecId, newSpec);
+
+ changes.add(new MetadataUpdate.AddPartitionSpec(newSpec));
+
+ return newSpecId;
+ }
+
+ private int reuseOrCreateNewSpecId(PartitionSpec newSpec) {
+ // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
+ int newSpecId = INITIAL_SPEC_ID;
+ for (PartitionSpec spec : specs) {
+ if (newSpec.compatibleWith(spec)) {
+ return spec.specId();
+ } else if (newSpecId <= spec.specId()) {
+ newSpecId = spec.specId() + 1;
+ }
+ }
+
+ return newSpecId;
+ }
+
+ private int addSortOrderInternal(SortOrder order) {
+ int newOrderId = reuseOrCreateNewSortOrderId(order);
+ if (sortOrdersById.containsKey(newOrderId)) {
+ return newOrderId;
+ }
+
+ Schema schema = schemasById.get(currentSchemaId);
+ SortOrder.checkCompatibility(order, schema);
+
+ SortOrder newOrder;
+ if (order.isUnsorted()) {
+ newOrder = SortOrder.unsorted();
+ } else {
+ // rebuild the sort order using new column ids
+ newOrder = freshSortOrder(newOrderId, schema, order);
+ }
+
+ sortOrders.add(newOrder);
+ sortOrdersById.put(newOrderId, newOrder);
+
+ changes.add(new MetadataUpdate.AddSortOrder(newOrder));
+
+ return newOrderId;
+ }
+
+ private int reuseOrCreateNewSortOrderId(SortOrder newOrder) {
+ if (newOrder.isUnsorted()) {
+ return SortOrder.unsorted().orderId();
+ }
+
+ // determine the next order id
+ int newOrderId = INITIAL_SORT_ORDER_ID;
+ for (SortOrder order : sortOrders) {
+ if (order.sameOrder(newOrder)) {
+ return order.orderId();
+ } else if (newOrderId <= order.orderId()) {
+ newOrderId = order.orderId() + 1;
+ }
+ }
+
+ return newOrderId;
+ }
+
+ private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) {
+ if (snapshot == null) {
+ this.currentSnapshotId = -1;
+ snapshotLog.clear();
+ changes.add(new MetadataUpdate.SetCurrentSnapshot(null));
+ return;
+ }
+
+ if (currentSnapshotId == snapshot.snapshotId()) {
+ return;
+ }
+
+ ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
+ "Last sequence number %s is less than existing snapshot sequence number %s",
+ lastSequenceNumber, snapshot.sequenceNumber());
+
+ this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis();
+ this.currentSnapshotId = snapshot.snapshotId();
+ snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, snapshot.snapshotId()));
+ changes.add(new MetadataUpdate.SetCurrentSnapshot(snapshot.snapshotId()));
+ }
+
+ private static List<MetadataLogEntry> addPreviousFile(
+ List<MetadataLogEntry> previousFiles, String previousFileLocation, long timestampMillis,
+ Map<String, String> properties) {
+ if (previousFileLocation == null) {
+ return previousFiles;
+ }
+
+ int maxSize = Math.max(1, PropertyUtil.propertyAsInt(properties,
+ TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
+
+ List<MetadataLogEntry> newMetadataLog;
+ if (previousFiles.size() >= maxSize) {
+ int removeIndex = previousFiles.size() - maxSize + 1;
+ newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
+ } else {
+ newMetadataLog = Lists.newArrayList(previousFiles);
+ }
+ newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation));
+
+ return newMetadataLog;
+ }
+
+ /**
+ * Finds intermediate snapshots that have not been committed as the current snapshot.
+ *
+ * @return a set of snapshot ids for all added snapshots that were later replaced as the current snapshot in changes
+ */
+ private static Set<Long> intermediateSnapshotIdSet(List<MetadataUpdate> changes, long currentSnapshotId) {
+ Set<Long> addedSnapshotIds = Sets.newHashSet();
+ Set<Long> intermediateSnapshotIds = Sets.newHashSet();
+ for (MetadataUpdate update : changes) {
+ if (update instanceof MetadataUpdate.AddSnapshot) {
+ // adds must always come before set current snapshot
+ MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) update;
+ addedSnapshotIds.add(addSnapshot.snapshot().snapshotId());
+ } else if (update instanceof MetadataUpdate.SetCurrentSnapshot) {
+ Long snapshotId = ((MetadataUpdate.SetCurrentSnapshot) update).snapshotId();
+ if (snapshotId != null && addedSnapshotIds.contains(snapshotId) && snapshotId != currentSnapshotId) {
+ intermediateSnapshotIds.add(snapshotId);
+ }
+ }
+ }
+
+ return intermediateSnapshotIds;
+ }
+
+ private static List<HistoryEntry> updateSnapshotLog(
+ List<HistoryEntry> snapshotLog, Map<Long, Snapshot> snapshotsById, long currentSnapshotId,
+ List<MetadataUpdate> changes) {
+ // find intermediate snapshots to suppress incorrect entries in the snapshot log.
+ //
+ // transactions can create snapshots that are never the current snapshot because several changes are combined
+ // by the transaction into one table metadata update. when each intermediate snapshot is added to table metadata,
+ // it is added to the snapshot log, assuming that it will be the current snapshot. when there are multiple
+ // snapshot updates, the log must be corrected by suppressing the intermediate snapshot entries.
+ //
+ // a snapshot is an intermediate snapshot if it was added but is not the current snapshot.
+ Set<Long> intermediateSnapshotIds = intermediateSnapshotIdSet(changes, currentSnapshotId);
+
+ // update the snapshot log
+ List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+ for (HistoryEntry logEntry : snapshotLog) {
+ long snapshotId = logEntry.snapshotId();
+ if (snapshotsById.containsKey(snapshotId) && !intermediateSnapshotIds.contains(snapshotId)) {
+ // copy the log entries that are still valid
+ newSnapshotLog.add(logEntry);
+ } else {
+ // any invalid entry causes the history before it to be removed. otherwise, there could be
+ // history gaps that cause time-travel queries to produce incorrect results. for example,
+ // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
+ // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
+ // and t3 when in fact s2 was the current snapshot.
+ newSnapshotLog.clear();
+ }
+ }
+
+ if (snapshotsById.get(currentSnapshotId) != null) {
+ ValidationException.check(Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
+ "Cannot set invalid snapshot log: latest entry is not the current snapshot");
+ }
+
+ return newSnapshotLog;
}
- return newSchemaId;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 1e26fbb..8810fc1 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -433,6 +433,6 @@ public class TableMetadataParser {
return new TableMetadata(metadataLocation, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
- snapshots, entries.build(), metadataEntries.build());
+ snapshots, entries.build(), metadataEntries.build(), ImmutableList.of() /* no changes from the file */);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index 6ddbeab..97f2d98 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -108,7 +108,7 @@ public class TestTableMetadata {
7, ImmutableList.of(TEST_SCHEMA, schema),
5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
- Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
+ Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of(), ImmutableList.of());
String asJson = TableMetadataParser.toJson(expected);
TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -180,7 +180,8 @@ public class TestTableMetadata {
0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID,
ImmutableList.of(schema), 6, ImmutableList.of(spec), spec.lastAssignedFieldId(),
TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), ImmutableMap.of("property", "value"),
- currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
+ currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of(),
+ ImmutableList.of());
String asJson = toJsonWithoutSpecAndSchemaList(expected);
TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -302,7 +303,7 @@ public class TestTableMetadata {
7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
- ImmutableList.copyOf(previousMetadataLog));
+ ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
String asJson = TableMetadataParser.toJson(base);
TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), asJson);
@@ -322,6 +323,8 @@ public class TestTableMetadata {
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+ reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+ reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
long currentTimestamp = System.currentTimeMillis();
List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -337,7 +340,7 @@ public class TestTableMetadata {
7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
- ImmutableList.copyOf(previousMetadataLog));
+ ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
previousMetadataLog.add(latestPreviousMetadata);
@@ -362,6 +365,8 @@ public class TestTableMetadata {
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+ reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+ reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
long currentTimestamp = System.currentTimeMillis();
List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -384,7 +389,7 @@ public class TestTableMetadata {
ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
- ImmutableList.copyOf(previousMetadataLog));
+ ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
previousMetadataLog.add(latestPreviousMetadata);
@@ -414,6 +419,8 @@ public class TestTableMetadata {
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
+ reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId));
+ reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId));
long currentTimestamp = System.currentTimeMillis();
List<MetadataLogEntry> previousMetadataLog = Lists.newArrayList();
previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100,
@@ -431,12 +438,12 @@ public class TestTableMetadata {
"/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
TableMetadata base = new TableMetadata(latestPreviousMetadata.file(), 1, UUID.randomUUID().toString(),
- TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), 2,
+ TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(),
ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
- TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(SortOrder.unsorted()),
+ SortOrder.unsorted().orderId(), ImmutableList.of(SortOrder.unsorted()),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
- ImmutableList.copyOf(previousMetadataLog));
+ ImmutableList.copyOf(previousMetadataLog), ImmutableList.of());
previousMetadataLog.add(latestPreviousMetadata);
@@ -462,7 +469,7 @@ public class TestTableMetadata {
LAST_ASSIGNED_COLUMN_ID, 7, ImmutableList.of(TEST_SCHEMA),
SPEC_5.specId(), ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
- ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+ ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
@@ -475,7 +482,7 @@ public class TestTableMetadata {
System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(), ImmutableList.of(SPEC_5),
SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
- ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+ ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
@@ -581,7 +588,8 @@ public class TestTableMetadata {
.add(1, 1005, "x_partition", "bucket[4]")
.build();
String location = "file://tmp/db/table";
- TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of());
+ TableMetadata metadata = TableMetadata.newTableMetadata(
+ schema, PartitionSpec.unpartitioned(), location, ImmutableMap.of());
AssertHelpers.assertThrows("Should fail to update an invalid partition spec",
ValidationException.class, "Spec does not use sequential IDs that are required in v1",
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index 2e0ecd6..a24f94f 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -190,9 +190,10 @@ public class TestTables {
throw new CommitFailedException("Injected failure");
}
Integer version = VERSIONS.get(tableName);
+ // remove changes from the committed metadata
+ this.current = TableMetadata.buildFrom(updatedMetadata).discardChanges().build();
VERSIONS.put(tableName, version == null ? 0 : version + 1);
- METADATA.put(tableName, updatedMetadata);
- this.current = updatedMetadata;
+ METADATA.put(tableName, current);
} else {
throw new CommitFailedException(
"Commit failed: table was updated at %d", current.lastUpdatedMillis());
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
index 106cad4..6b40662 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
@@ -79,18 +79,19 @@ public class NessieTableOperations extends BaseMetastoreTableOperations {
}
@Override
- protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
- int numRetries) {
+ protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
super.refreshFromMetadataLocation(newLocation, shouldRetry, numRetries, this::loadTableMetadata);
}
private TableMetadata loadTableMetadata(String metadataLocation) {
// Update the TableMetadata with the Content of NessieTableState.
- return TableMetadataParser.read(io(), metadataLocation)
- .withCurrentSnapshotOnly(table.getSnapshotId())
- .withCurrentSchema(table.getSchemaId())
- .withDefaultSortOrder(table.getSortOrderId())
- .withDefaultSpec(table.getSpecId());
+ return TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation))
+ .setCurrentSnapshot(table.getSnapshotId())
+ .setCurrentSchema(table.getSchemaId())
+ .setDefaultSortOrder(table.getSortOrderId())
+ .setDefaultPartitionSpec(table.getSpecId())
+ .discardChanges()
+ .build();
}
@Override