You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/15 21:57:34 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #4128: Core: Update SnapshotManager to use a transaction

jackye1995 commented on a change in pull request #4128:
URL: https://github.com/apache/iceberg/pull/4128#discussion_r807298992



##########
File path: core/src/main/java/org/apache/iceberg/SnapshotManager.java
##########
@@ -19,292 +19,48 @@
 
 package org.apache.iceberg;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.PartitionSet;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
-import org.apache.iceberg.util.WapUtil;
 
-public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> implements ManageSnapshots {
+public class SnapshotManager implements ManageSnapshots {
 
-  private enum SnapshotManagerOperation {
-    CHERRYPICK,
-    ROLLBACK
-  }
-
-  private final Map<Integer, PartitionSpec> specsById;
-  private SnapshotManagerOperation managerOperation = null;
-  private Long targetSnapshotId = null;
-  private String snapshotOperation = null;
-  private Long requiredCurrentSnapshotId = null;
-  private Long overwriteParentId = null;
-  private PartitionSet replacedPartitions = null;
+  private final BaseTransaction transaction;
 
   SnapshotManager(String tableName, TableOperations ops) {
-    super(tableName, ops);
-    this.specsById = ops.current().specsById();
-  }
-
-  @Override
-  protected ManageSnapshots self() {
-    return this;
-  }
-
-  @Override
-  protected String operation() {
-    // snapshotOperation is used by SnapshotProducer when building and writing a new snapshot for cherrypick
-    Preconditions.checkNotNull(snapshotOperation, "[BUG] Detected uninitialized operation");
-    return snapshotOperation;
+    Preconditions.checkState(ops.current() != null, "Cannot manage snapshots: table %s does not exist", tableName);
+    this.transaction = new BaseTransaction(tableName, ops, BaseTransaction.TransactionType.SIMPLE, ops.refresh());
   }
 
   @Override
   public ManageSnapshots cherrypick(long snapshotId) {
-    TableMetadata current = current();
-    ValidationException.check(current.snapshot(snapshotId) != null,
-        "Cannot cherry pick unknown snapshot id: %s", snapshotId);
-
-    Snapshot cherryPickSnapshot = current.snapshot(snapshotId);
-    // only append operations are currently supported
-    if (cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
-      this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
-      this.targetSnapshotId = snapshotId;
-      this.snapshotOperation = cherryPickSnapshot.operation();
-
-      // Pick modifications from the snapshot
-      for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
-        add(addedFile);
-      }
-
-      // this property is set on target snapshot that will get published
-      String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
-      if (wapId != null) {
-        set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
-      }
-
-      // link the snapshot about to be published on commit with the picked snapshot
-      set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
-
-    } else if (cherryPickSnapshot.operation().equals(DataOperations.OVERWRITE) &&
-        PropertyUtil.propertyAsBoolean(cherryPickSnapshot.summary(), SnapshotSummary.REPLACE_PARTITIONS_PROP, false)) {
-      // the operation was ReplacePartitions. this can be cherry-picked iff the partitions have not been modified.
-      // detecting modification requires finding the new files since the parent was committed, so the parent must be an
-      // ancestor of the current state, or null if the overwrite was based on an empty table.
-      this.overwriteParentId = cherryPickSnapshot.parentId();
-      ValidationException.check(overwriteParentId == null || isCurrentAncestor(current, overwriteParentId),
-          "Cannot cherry-pick overwrite not based on an ancestor of the current state: %s", snapshotId);
-
-      this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
-      this.targetSnapshotId = snapshotId;
-      this.snapshotOperation = cherryPickSnapshot.operation();
-      this.replacedPartitions = PartitionSet.create(specsById);
-
-      // check that all deleted files are still in the table
-      failMissingDeletePaths();
-
-      // copy adds from the picked snapshot
-      for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
-        add(addedFile);
-        replacedPartitions.add(addedFile.specId(), addedFile.partition());
-      }
-
-      // copy deletes from the picked snapshot
-      for (DataFile deletedFile : cherryPickSnapshot.deletedFiles()) {
-        delete(deletedFile);
-      }
-
-      // this property is set on target snapshot that will get published
-      String overwriteWapId = WapUtil.validateWapPublish(current, targetSnapshotId);
-      if (overwriteWapId != null) {
-        set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, overwriteWapId);
-      }
-
-      // link the snapshot about to be published on commit with the picked snapshot
-      set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
-
-    } else {
-      // cherry-pick should work if the table can be fast-forwarded
-      this.managerOperation = SnapshotManagerOperation.ROLLBACK;
-      this.requiredCurrentSnapshotId = cherryPickSnapshot.parentId();
-      this.targetSnapshotId = snapshotId;
-      validateCurrentSnapshot(current, requiredCurrentSnapshotId);
-    }
-
+    transaction.cherryPick().cherrypick(snapshotId).commit();
     return this;
   }
 
   @Override
   public ManageSnapshots setCurrentSnapshot(long snapshotId) {
-    ValidationException.check(current().snapshot(snapshotId) != null,
-        "Cannot roll back to unknown snapshot id: %s", snapshotId);
-
-    this.managerOperation = SnapshotManagerOperation.ROLLBACK;
-    this.targetSnapshotId = snapshotId;
-
+    transaction.setBranchSnapshot().setCurrentSnapshot(snapshotId).commit();
     return this;
   }
 
   @Override
   public ManageSnapshots rollbackToTime(long timestampMillis) {
-    // find the latest snapshot by timestamp older than timestampMillis
-    Snapshot snapshot = findLatestAncestorOlderThan(current(), timestampMillis);
-    Preconditions.checkArgument(snapshot != null,
-        "Cannot roll back, no valid snapshot older than: %s", timestampMillis);
-
-    this.managerOperation = SnapshotManagerOperation.ROLLBACK;
-    this.targetSnapshotId = snapshot.snapshotId();
-
+    transaction.setBranchSnapshot().rollbackToTime(timestampMillis).commit();
     return this;
   }
 
   @Override
   public ManageSnapshots rollbackTo(long snapshotId) {
-    TableMetadata current = current();
-    ValidationException.check(current.snapshot(snapshotId) != null,
-        "Cannot roll back to unknown snapshot id: %s", snapshotId);
-    ValidationException.check(
-        isCurrentAncestor(current, snapshotId),
-        "Cannot roll back to snapshot, not an ancestor of the current state: %s", snapshotId);
-    return setCurrentSnapshot(snapshotId);
-  }
-
-  @Override
-  public Object updateEvent() {
-    if (targetSnapshotId == null) {
-      // NOOP operation, no snapshot created
-      return null;
-    }
-
-    switch (managerOperation) {
-      case ROLLBACK:
-        // rollback does not create a new snapshot
-        return null;
-      case CHERRYPICK:
-        TableMetadata tableMetadata = refresh();
-        long snapshotId = tableMetadata.currentSnapshot().snapshotId();
-        if (targetSnapshotId == snapshotId) {
-          // No new snapshot is created for fast-forward
-          return null;
-        } else {
-          // New snapshot created, we rely on super class to fire a CreateSnapshotEvent
-          return super.updateEvent();
-        }
-      default:
-        throw new UnsupportedOperationException(managerOperation + " is not supported");
-    }
-  }
-
-  @Override
-  protected void validate(TableMetadata base) {
-    validateCurrentSnapshot(base, requiredCurrentSnapshotId);
-    validateNonAncestor(base, targetSnapshotId);
-    validateReplacedPartitions(base, overwriteParentId, replacedPartitions);
-    WapUtil.validateWapPublish(base, targetSnapshotId);
+    transaction.setBranchSnapshot().rollbackTo(snapshotId).commit();
+    return this;
   }
 
   @Override
   public Snapshot apply() {
-    TableMetadata base = refresh();
-
-    if (targetSnapshotId == null) {
-      // if no target snapshot was configured then NOOP by returning current state
-      return base.currentSnapshot();
-    }
-
-    switch (managerOperation) {
-      case CHERRYPICK:
-        if (base.snapshot(targetSnapshotId).parentId() != null &&
-            base.currentSnapshot().snapshotId() == base.snapshot(targetSnapshotId).parentId()) {
-          // the snapshot to cherrypick is already based on the current state: fast-forward
-          validate(base);
-          return base.snapshot(targetSnapshotId);
-        } else {
-          // validate(TableMetadata) is called in apply(TableMetadata) after this apply refreshes the table state
-          return super.apply();
-        }
-
-      case ROLLBACK:
-        return base.snapshot(targetSnapshotId);
-
-      default:
-        throw new ValidationException("Invalid SnapshotManagerOperation: only cherrypick, rollback are supported");
-    }
-  }
-
-  private static void validateCurrentSnapshot(TableMetadata meta, Long requiredSnapshotId) {
-    if (requiredSnapshotId != null && meta.currentSnapshot() != null) {
-      ValidationException.check(meta.currentSnapshot().snapshotId() == requiredSnapshotId,
-          "Cannot fast-forward to non-append snapshot; current has changed: current=%s != required=%s",
-          meta.currentSnapshot().snapshotId(), requiredSnapshotId);
-    }
+    return transaction.table().currentSnapshot();

Review comment:
       so the apply here is basically not used, just to fulfill the existing interface, and the actual underlying transaction is committed during `commit()`.
   
   This is a bit different from the original definition of apply as:
   
   > Apply the pending changes and return the uncommitted changes for validation.
   
   If we add methods for ref updates here later it will not be shown as uncommitted changes because it is not a part of `Snapshot`. But I guess it's fine here, result of apply is always fed to commit anyway, and in this case the transaction itself is doing the commit so information is not lost.

##########
File path: core/src/main/java/org/apache/iceberg/SnapshotManager.java
##########
@@ -19,292 +19,48 @@
 
 package org.apache.iceberg;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.PartitionSet;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
-import org.apache.iceberg.util.WapUtil;
 
-public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> implements ManageSnapshots {
+public class SnapshotManager implements ManageSnapshots {

Review comment:
       With this change, `ManageSnapshots` is basically a wrapper of running transactions for snapshot-related operations. Do you think we will eventually add it as a part of the `Transaction`, so people can do
   
   ```java
   Transaction transaction = table.newTransaction();
   transaction.manageSnapshots()
     .createBranch("dev")
     .cherrypick("dev", 123L)
     .commit();
   transaction.newAppend().appendFiles(f1, f2).toBranch("dev").commit();
   transaction.commit();
   ```
   
   Or we do not add this to transactions, and we do:
   
   ```java
   Transaction transaction = table.newTransaction();
   transaction.updateRefs().createBranch("dev").commit();
   transaction.cherryPick().cherrypick("dev", 123L).commit();
   transaction.newAppend().appendFiles(f1, f2).toBranch("dev").commit();
   transaction.commit();
   ```
   
   Which means we will still need a new API for ref updates.
   
   Do you see any benefits for any of the approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org