You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2020/02/20 17:43:01 UTC
[incubator-iceberg] branch master updated: Allow cherry-picking any
commit as long as it is a fast-forward. (#799)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 55d27ed Allow cherry-picking any commit as long as it is a fast-forward. (#799)
55d27ed is described below
commit 55d27ed16ba826af0419952cc0561a95b88d271d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Feb 20 09:42:51 2020 -0800
Allow cherry-picking any commit as long as it is a fast-forward. (#799)
---
.../java/org/apache/iceberg/SnapshotManager.java | 50 ++++++++++-------
.../java/org/apache/iceberg/TestWapWorkflow.java | 62 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index 58a278d..7e8ada0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -37,6 +37,7 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
private SnapshotManagerOperation managerOperation = null;
private Long targetSnapshotId = null;
private String snapshotOperation = null;
+ private Long requiredCurrentSnapshotId = null;
SnapshotManager(TableOperations ops) {
super(ops);
@@ -62,28 +63,32 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
Snapshot cherryPickSnapshot = current.snapshot(snapshotId);
// only append operations are currently supported
- if (!cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
- throw new UnsupportedOperationException("Can cherry pick only append operations");
- }
-
- this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
- this.targetSnapshotId = snapshotId;
- this.snapshotOperation = cherryPickSnapshot.operation();
+ if (cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
+ this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
+ this.targetSnapshotId = snapshotId;
+ this.snapshotOperation = cherryPickSnapshot.operation();
+
+ // Pick modifications from the snapshot
+ for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
+ add(addedFile);
+ }
- // Pick modifications from the snapshot
- for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
- add(addedFile);
- }
+ // this property is set on target snapshot that will get published
+ String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
+ if (wapId != null) {
+ set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
+ }
- // this property is set on target snapshot that will get published
- String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
- if (wapId != null) {
- set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
+ // link the snapshot about to be published on commit with the picked snapshot
+ set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
+ } else {
+ // cherry-pick should work if the table can be fast-forwarded
+ this.managerOperation = SnapshotManagerOperation.ROLLBACK;
+ this.requiredCurrentSnapshotId = cherryPickSnapshot.parentId();
+ this.targetSnapshotId = snapshotId;
+ validateCurrentSnapshot(current, requiredCurrentSnapshotId);
}
- // link the snapshot about to be published on commit with the picked snapshot
- set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
-
return this;
}
@@ -123,6 +128,7 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
}
private void validate(TableMetadata base) {
+ validateCurrentSnapshot(base, requiredCurrentSnapshotId);
validateNonAncestor(base, targetSnapshotId);
WapUtil.validateWapPublish(base, targetSnapshotId);
}
@@ -164,6 +170,14 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
}
}
+ private static void validateCurrentSnapshot(TableMetadata meta, Long requiredSnapshotId) {
+ if (requiredSnapshotId != null) {
+ ValidationException.check(meta.currentSnapshot().snapshotId() == requiredSnapshotId,
+ "Cannot fast-forward to non-append snapshot; current has changed: current=%s != required=%s",
+ meta.currentSnapshot().snapshotId(), requiredSnapshotId);
+ }
+ }
+
private static void validateNonAncestor(TableMetadata meta, long snapshotId) {
if (isCurrentAncestor(meta, snapshotId)) {
throw new CherrypickAncestorCommitException(snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
index f11363d..9f14860 100644
--- a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
+++ b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
import org.apache.iceberg.exceptions.ValidationException;
@@ -35,6 +36,67 @@ public class TestWapWorkflow extends TableTestBase {
}
@Test
+ public void testCherryPickOverwrite() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newOverwrite()
+ .deleteFile(FILE_A)
+ .addFile(FILE_B)
+ .stageOnly()
+ .commit();
+
+ // the overwrite should only be staged
+ validateTableFiles(table, FILE_A);
+
+ Snapshot overwrite = Streams.stream(table.snapshots())
+ .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation()))
+ .findFirst()
+ .get();
+
+ // cherry-pick the overwrite; this works because it is a fast-forward commit
+ table.manageSnapshots().cherrypick(overwrite.snapshotId()).commit();
+
+ // the overwrite should only be staged
+ validateTableFiles(table, FILE_B);
+ }
+
+ @Test
+ public void testCherryPickOverwriteFailsIfCurrentHasChanged() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newOverwrite()
+ .deleteFile(FILE_A)
+ .addFile(FILE_B)
+ .stageOnly()
+ .commit();
+
+ // add a commit after the overwrite that prevents it from being a fast-forward operation
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ // the overwrite should only be staged
+ validateTableFiles(table, FILE_A, FILE_C);
+
+ Snapshot overwrite = Streams.stream(table.snapshots())
+ .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation()))
+ .findFirst()
+ .get();
+
+ // try to cherry-pick, which should fail because the overwrite's parent is no longer current
+ AssertHelpers.assertThrows("Should reject overwrite that is not a fast-forward commit",
+ ValidationException.class, "Cannot fast-forward to non-append snapshot",
+ () -> table.manageSnapshots().cherrypick(overwrite.snapshotId()).commit());
+
+ // the table state should not have changed
+ validateTableFiles(table, FILE_A, FILE_C);
+ }
+
+ @Test
public void testCurrentSnapshotOperation() {
table.newAppend()