You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2020/02/20 17:43:01 UTC

[incubator-iceberg] branch master updated: Allow cherry-picking any commit as long as it is a fast-forward. (#799)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 55d27ed  Allow cherry-picking any commit as long as it is a fast-forward. (#799)
55d27ed is described below

commit 55d27ed16ba826af0419952cc0561a95b88d271d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Feb 20 09:42:51 2020 -0800

    Allow cherry-picking any commit as long as it is a fast-forward. (#799)
---
 .../java/org/apache/iceberg/SnapshotManager.java   | 50 ++++++++++-------
 .../java/org/apache/iceberg/TestWapWorkflow.java   | 62 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index 58a278d..7e8ada0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -37,6 +37,7 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
   private SnapshotManagerOperation managerOperation = null;
   private Long targetSnapshotId = null;
   private String snapshotOperation = null;
+  private Long requiredCurrentSnapshotId = null;
 
   SnapshotManager(TableOperations ops) {
     super(ops);
@@ -62,28 +63,32 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
 
     Snapshot cherryPickSnapshot = current.snapshot(snapshotId);
     // only append operations are currently supported
-    if (!cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
-      throw new UnsupportedOperationException("Can cherry pick only append operations");
-    }
-
-    this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
-    this.targetSnapshotId = snapshotId;
-    this.snapshotOperation = cherryPickSnapshot.operation();
+    if (cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
+      this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
+      this.targetSnapshotId = snapshotId;
+      this.snapshotOperation = cherryPickSnapshot.operation();
+
+      // Pick modifications from the snapshot
+      for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
+        add(addedFile);
+      }
 
-    // Pick modifications from the snapshot
-    for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
-      add(addedFile);
-    }
+      // this property is set on target snapshot that will get published
+      String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
+      if (wapId != null) {
+        set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
+      }
 
-    // this property is set on target snapshot that will get published
-    String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
-    if (wapId != null) {
-      set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
+      // link the snapshot about to be published on commit with the picked snapshot
+      set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
+    } else {
+      // cherry-pick should work if the table can be fast-forwarded
+      this.managerOperation = SnapshotManagerOperation.ROLLBACK;
+      this.requiredCurrentSnapshotId = cherryPickSnapshot.parentId();
+      this.targetSnapshotId = snapshotId;
+      validateCurrentSnapshot(current, requiredCurrentSnapshotId);
     }
 
-    // link the snapshot about to be published on commit with the picked snapshot
-    set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
-
     return this;
   }
 
@@ -123,6 +128,7 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
   }
 
   private void validate(TableMetadata base) {
+    validateCurrentSnapshot(base, requiredCurrentSnapshotId);
     validateNonAncestor(base, targetSnapshotId);
     WapUtil.validateWapPublish(base, targetSnapshotId);
   }
@@ -164,6 +170,14 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
     }
   }
 
+  private static void validateCurrentSnapshot(TableMetadata meta, Long requiredSnapshotId) {
+    if (requiredSnapshotId != null) {
+      ValidationException.check(meta.currentSnapshot().snapshotId() == requiredSnapshotId,
+          "Cannot fast-forward to non-append snapshot; current has changed: current=%s != required=%s",
+          meta.currentSnapshot().snapshotId(), requiredSnapshotId);
+    }
+  }
+
   private static void validateNonAncestor(TableMetadata meta, long snapshotId) {
     if (isCurrentAncestor(meta, snapshotId)) {
       throw new CherrypickAncestorCommitException(snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
index f11363d..9f14860 100644
--- a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
+++ b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
 import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
 import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -35,6 +36,67 @@ public class TestWapWorkflow extends TableTestBase {
   }
 
   @Test
+  public void testCherryPickOverwrite() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newOverwrite()
+        .deleteFile(FILE_A)
+        .addFile(FILE_B)
+        .stageOnly()
+        .commit();
+
+    // the overwrite should only be staged
+    validateTableFiles(table, FILE_A);
+
+    Snapshot overwrite = Streams.stream(table.snapshots())
+        .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation()))
+        .findFirst()
+        .get();
+
+    // cherry-pick the overwrite; this works because it is a fast-forward commit
+    table.manageSnapshots().cherrypick(overwrite.snapshotId()).commit();
+
+    // the overwrite should only be staged
+    validateTableFiles(table, FILE_B);
+  }
+
+  @Test
+  public void testCherryPickOverwriteFailsIfCurrentHasChanged() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newOverwrite()
+        .deleteFile(FILE_A)
+        .addFile(FILE_B)
+        .stageOnly()
+        .commit();
+
+    // add a commit after the overwrite that prevents it from being a fast-forward operation
+    table.newAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    // the overwrite should only be staged
+    validateTableFiles(table, FILE_A, FILE_C);
+
+    Snapshot overwrite = Streams.stream(table.snapshots())
+        .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation()))
+        .findFirst()
+        .get();
+
+    // try to cherry-pick, which should fail because the overwrite's parent is no longer current
+    AssertHelpers.assertThrows("Should reject overwrite that is not a fast-forward commit",
+        ValidationException.class, "Cannot fast-forward to non-append snapshot",
+        () -> table.manageSnapshots().cherrypick(overwrite.snapshotId()).commit());
+
+    // the table state should not have changed
+    validateTableFiles(table, FILE_A, FILE_C);
+  }
+
+  @Test
   public void testCurrentSnapshotOperation() {
 
     table.newAppend()