You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/04 23:24:11 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #3664: Add table metadata builder

jackye1995 commented on a change in pull request #3664:
URL: https://github.com/apache/iceberg/pull/3664#discussion_r762489598



##########
File path: core/src/main/java/org/apache/iceberg/TableMetadata.java
##########
@@ -467,308 +472,61 @@ public Snapshot currentSnapshot() {
     return previousFiles;
   }
 
+  public List<MetadataUpdate> changes() {
+    return changes;
+  }
+
   public TableMetadata withUUID() {
-    if (uuid != null) {
-      return this;
-    } else {
-      return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
-          lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-          lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties,
-          currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-    }
+    return new Builder(this).assignUUID().build();
   }
 
   public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
-    PartitionSpec.checkCompatibility(spec(), newSchema);
-    SortOrder.checkCompatibility(sortOrder(), newSchema);
-    // rebuild all of the partition specs and sort orders for the new current schema
-    List<PartitionSpec> updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec));
-    List<SortOrder> updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
-
-    int newSchemaId = reuseOrCreateNewSchemaId(newSchema);
-    if (currentSchemaId == newSchemaId && newLastColumnId == lastColumnId) {
-      // the new spec and last column Id is already current and no change is needed
-      return this;
-    }
-
-    ImmutableList.Builder<Schema> builder = ImmutableList.<Schema>builder().addAll(schemas);
-    if (!schemasById.containsKey(newSchemaId)) {
-      builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds()));
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), newLastColumnId,
-        newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId,
-        defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setCurrentSchema(newSchema, newLastColumnId).build();
   }
 
   // The caller is responsible to pass a newPartitionSpec with correct partition field IDs
   public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
-    Schema schema = schema();
-
-    PartitionSpec.checkCompatibility(newPartitionSpec, schema);
-    ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(newPartitionSpec),
-        "Spec does not use sequential IDs that are required in v1: %s", newPartitionSpec);
-
-    // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
-    int newDefaultSpecId = INITIAL_SPEC_ID;
-    for (PartitionSpec spec : specs) {
-      if (newPartitionSpec.compatibleWith(spec)) {
-        newDefaultSpecId = spec.specId();
-        break;
-      } else if (newDefaultSpecId <= spec.specId()) {
-        newDefaultSpecId = spec.specId() + 1;
-      }
-    }
-
-    if (defaultSpecId == newDefaultSpecId) {
-      // the new spec is already current and no change is needed
-      return this;
-    }
-
-    ImmutableList.Builder<PartitionSpec> builder = ImmutableList.<PartitionSpec>builder()
-        .addAll(specs);
-    if (!specsById.containsKey(newDefaultSpecId)) {
-      // get a fresh spec to ensure the spec ID is set to the new default
-      builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId,
-        builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()),
-        defaultSortOrderId, sortOrders, properties,
-        currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build();
   }
 
   public TableMetadata replaceSortOrder(SortOrder newOrder) {
-    Schema schema = schema();
-    SortOrder.checkCompatibility(newOrder, schema);
-
-    // determine the next order id
-    int newOrderId = INITIAL_SORT_ORDER_ID;
-    for (SortOrder order : sortOrders) {
-      if (order.sameOrder(newOrder)) {
-        newOrderId = order.orderId();
-        break;
-      } else if (newOrderId <= order.orderId()) {
-        newOrderId = order.orderId() + 1;
-      }
-    }
-
-    if (newOrderId == defaultSortOrderId) {
-      return this;
-    }
-
-    ImmutableList.Builder<SortOrder> builder = ImmutableList.builder();
-    builder.addAll(sortOrders);
-
-    if (!sortOrdersById.containsKey(newOrderId)) {
-      if (newOrder.isUnsorted()) {
-        newOrderId = SortOrder.unsorted().orderId();
-        builder.add(SortOrder.unsorted());
-      } else {
-        // rebuild the sort order using new column ids
-        builder.add(freshSortOrder(newOrderId, schema, newOrder));
-      }
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setDefaultSortOrder(newOrder).build();
   }
 
   public TableMetadata addStagedSnapshot(Snapshot snapshot) {
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
-        "Cannot add snapshot with sequence number %s older than last sequence number %s",
-        snapshot.sequenceNumber(), lastSequenceNumber);
-
-    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
-        .addAll(snapshots)
-        .add(snapshot)
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
-        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
-        defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).addSnapshot(snapshot).build();
   }
 
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
-    // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
-    if (snapshotsById.containsKey(snapshot.snapshotId())) {
-      return setCurrentSnapshotTo(snapshot);
-    }
-
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
-        "Cannot add snapshot with sequence number %s older than last sequence number %s",
-        snapshot.sequenceNumber(), lastSequenceNumber);
-
-    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
-        .addAll(snapshots)
-        .add(snapshot)
-        .build();
-    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
-        .addAll(snapshotLog)
-        .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
-        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
-        defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog,
-        addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    return new Builder(this).setCurrentSnapshot(snapshot).build();
   }
 
   public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
-    List<Snapshot> filtered = Lists.newArrayListWithExpectedSize(snapshots.size());
-    for (Snapshot snapshot : snapshots) {
-      // keep the current snapshot and any snapshots that do not match the removeIf condition
-      if (snapshot.snapshotId() == currentSnapshotId || !removeIf.test(snapshot)) {
-        filtered.add(snapshot);
-      }
-    }
-
-    // update the snapshot log
-    Set<Long> validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId));
-    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
-    for (HistoryEntry logEntry : snapshotLog) {
-      if (validIds.contains(logEntry.snapshotId())) {
-        // copy the log entries that are still valid
-        newSnapshotLog.add(logEntry);
-      } else {
-        // any invalid entry causes the history before it to be removed. otherwise, there could be
-        // history gaps that cause time-travel queries to produce incorrect results. for example,
-        // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
-        // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
-        // and t3 when in fact s2 was the current snapshot.
-        newSnapshotLog.clear();
-      }
-    }
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
-        ImmutableList.copyOf(newSnapshotLog), addPreviousFile(metadataFileLocation, lastUpdatedMillis));
-  }
-
-  private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
-    ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
-        "Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
-    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
-        "Last sequence number %s is less than existing snapshot sequence number %s",
-        lastSequenceNumber, snapshot.sequenceNumber());
-
-    if (currentSnapshotId == snapshot.snapshotId()) {
-      // change is a noop
-      return this;
-    }
-
-    long nowMillis = System.currentTimeMillis();
-    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
-        .addAll(snapshotLog)
-        .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
-        .build();
-
-    return new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots,
-        newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
+    List<Snapshot> toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList());
+    return new Builder(this).removeSnapshots(toRemove).build();
   }
 
   public TableMetadata replaceProperties(Map<String, String> rawProperties) {
     ValidationException.check(rawProperties != null, "Cannot set properties to null");
     Map<String, String> newProperties = unreservedProperties(rawProperties);
-    TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location,
-        lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
-        lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
-        snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties));
 
-    int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
-    if (formatVersion != newFormatVersion) {
-      metadata = metadata.upgradeToFormatVersion(newFormatVersion);
-    }
-
-    return metadata;
-  }
-
-  public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
-    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
-    for (HistoryEntry logEntry : snapshotLog) {
-      if (!snapshotIds.contains(logEntry.snapshotId())) {
-        // copy the log entries that are still valid
-        newSnapshotLog.add(logEntry);
+    Set<String> removed = Sets.newHashSet(properties.keySet());

Review comment:
       In `UpdateProperties`, we formulate a set of `updates` and `removes`, we merge them with the base metadata and pass to this method. But then we are doing the reverse here to decompose them back to `updated` and `removed`, it seems like redundant work. Is this done just to preserve the public `UpdateProperties` interface and `TableMetadata.replaceProperties` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org