You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/08/11 00:22:50 UTC
[iceberg] branch master updated: Core, API: Performing operations on a snapshot branch ref (#4926)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5a15efc070 Core, API: Performing operations on a snapshot branch ref (#4926)
5a15efc070 is described below
commit 5a15efc070ab59eeda6343998aa065c0c9892c5c
Author: Namratha Mysore Keshavaprakash <nm...@gmail.com>
AuthorDate: Thu Aug 11 05:52:45 2022 +0530
Core, API: Performing operations on a snapshot branch ref (#4926)
Co-authored-by: Amogh Jahagirdar <ja...@amazon.com>
---
.palantir/revapi.yml | 3 ++
.../java/org/apache/iceberg/SnapshotUpdate.java | 12 ++++++
.../org/apache/iceberg/BaseOverwriteFiles.java | 2 +-
.../org/apache/iceberg/BaseReplacePartitions.java | 6 +--
.../java/org/apache/iceberg/BaseRewriteFiles.java | 2 +-
.../org/apache/iceberg/BaseRewriteManifests.java | 2 +-
.../main/java/org/apache/iceberg/BaseRowDelta.java | 2 +-
.../org/apache/iceberg/CherryPickOperation.java | 2 +-
.../main/java/org/apache/iceberg/FastAppend.java | 12 ++++--
.../apache/iceberg/MergingSnapshotProducer.java | 8 ++--
.../java/org/apache/iceberg/SnapshotProducer.java | 46 +++++++++++++++-----
.../java/org/apache/iceberg/TestFastAppend.java | 50 ++++++++++++++++++++++
.../java/org/apache/iceberg/TestOverwrite.java | 13 ++++++
.../org/apache/iceberg/TestReplacePartitions.java | 9 ++++
.../org/apache/iceberg/TestRewriteManifests.java | 14 ++++++
.../test/java/org/apache/iceberg/TestRowDelta.java | 16 +++++++
16 files changed, 172 insertions(+), 27 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 4491003fc4..93755b81cf 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -27,6 +27,9 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
justification: "Accept all changes prior to introducing API compatibility checks"
+ - code: "java.method.addedToInterface"
+ new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::toBranch(java.lang.String)"
+ justification: "Adding toBranch API for supporting committing to a branch"
- code: "java.method.addedToInterface"
new: "method boolean org.apache.iceberg.expressions.BoundTerm<T>::isEquivalentTo(org.apache.iceberg.expressions.BoundTerm<?>)"
justification: "new API method"
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 2c5ab79008..cc6b02dee4 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -59,4 +59,16 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
* @return this for method chaining
*/
ThisT scanManifestsWith(ExecutorService executorService);
+
+ /**
+ * Perform operations on a particular branch
+ *
+ * @param branch which is name of SnapshotRef of type branch.
+ */
+ default ThisT toBranch(String branch) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot commit to branch %s: %s does not support branch commits",
+ branch, this.getClass().getName()));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
index bbb51fdc7e..a073d79e55 100644
--- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
@@ -105,7 +105,7 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
}
@Override
- protected void validate(TableMetadata base) {
+ protected void validate(TableMetadata base, Snapshot snapshot) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();
diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
index 2847f5ceca..dd44505e9d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
@@ -80,7 +80,7 @@ public class BaseReplacePartitions extends MergingSnapshotProducer<ReplacePartit
}
@Override
- public void validate(TableMetadata currentMetadata) {
+ public void validate(TableMetadata currentMetadata, Snapshot snapshot) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
@@ -101,14 +101,14 @@ public class BaseReplacePartitions extends MergingSnapshotProducer<ReplacePartit
}
@Override
- public List<ManifestFile> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}
try {
- return super.apply(base);
+ return super.apply(base, snapshot);
} catch (ManifestFilterManager.DeleteException e) {
throw new ValidationException(
"Cannot commit file that conflicts with existing partition: %s", e.partition());
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
index 1bc846e276..8a3b137b2d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
@@ -110,7 +110,7 @@ class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements
}
@Override
- protected void validate(TableMetadata base) {
+ protected void validate(TableMetadata base, Snapshot snapshot) {
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index c61b99dcfc..816bc0c8a7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -168,7 +168,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
}
@Override
- public List<ManifestFile> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> currentManifests = base.currentSnapshot().dataManifests(ops.io());
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);
diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
index 35a04ba394..50a0e26ab3 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
@@ -96,7 +96,7 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta
}
@Override
- protected void validate(TableMetadata base) {
+ protected void validate(TableMetadata base, Snapshot snapshot) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(
diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java
index 77de542797..3786b1185b 100644
--- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java
+++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java
@@ -152,7 +152,7 @@ class CherryPickOperation extends MergingSnapshotProducer<CherryPickOperation> {
}
@Override
- protected void validate(TableMetadata base) {
+ protected void validate(TableMetadata base, Snapshot snapshot) {
// this is only called after apply() passes off to super, but check fast-forward status just in
// case
if (!isFastForward(base)) {
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index febdcee633..f3955e15f6 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -97,6 +97,12 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
return this;
}
+ @Override
+ public FastAppend toBranch(String branch) {
+ targetBranch(branch);
+ return this;
+ }
+
@Override
public FastAppend appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
@@ -135,7 +141,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
}
@Override
- public List<ManifestFile> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> newManifests = Lists.newArrayList();
try {
@@ -153,8 +159,8 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);
- if (base.currentSnapshot() != null) {
- newManifests.addAll(base.currentSnapshot().allManifests(ops.io()));
+ if (snapshot != null) {
+ newManifests.addAll(snapshot.allManifests(ops.io()));
}
return newManifests;
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 789c6c23c3..b82244f071 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -758,13 +758,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
@Override
- public List<ManifestFile> apply(TableMetadata base) {
- Snapshot current = base.currentSnapshot();
-
+ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
- base.schema(), current != null ? current.dataManifests(ops.io()) : null);
+ base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
@@ -777,7 +775,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
- base.schema(), current != null ? current.deleteManifests(ops.io()) : null);
+ base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 520c70bcef..87aa4126ce 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -84,6 +84,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService workerPool = ThreadPools.getWorkerPool();
+ private String targetBranch = SnapshotRef.MAIN_BRANCH;
protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
@@ -113,6 +114,20 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
return self();
}
+ /**
+ * * A setter for the target branch on which snapshot producer operation should be performed
+ *
+ * @param branch to set as target branch
+ */
+ protected void targetBranch(String branch) {
+ Preconditions.checkArgument(branch != null, "Invalid branch name: null");
+ boolean refExists = base.ref(branch) != null;
+ Preconditions.checkArgument(
+ !refExists || base.ref(branch).isBranch(),
+ "%s is a tag, not a branch. Tags cannot be targets for producing snapshots");
+ this.targetBranch = branch;
+ }
+
protected ExecutorService workerPool() {
return this.workerPool;
}
@@ -150,28 +165,37 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
* <p>Child operations can override this to add custom validation.
*
* @param currentMetadata current table metadata to validate
+ * @param snapshot ending snapshot on the lineage which is being validated
*/
- protected void validate(TableMetadata currentMetadata) {}
+ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
/**
- * Apply the update's changes to the base table metadata and return the new manifest list.
+ * Apply the update's changes to the given metadata and snapshot. Return the new manifest list.
*
* @param metadataToUpdate the base table metadata to apply changes to
+ * @param snapshot snapshot to apply the changes to
* @return a manifest list for the new snapshot.
*/
- protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate);
+ protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate, Snapshot snapshot);
@Override
public Snapshot apply() {
refresh();
- Long parentSnapshotId =
- base.currentSnapshot() != null ? base.currentSnapshot().snapshotId() : null;
- long sequenceNumber = base.nextSequenceNumber();
+ Snapshot parentSnapshot = base.currentSnapshot();
+ if (targetBranch != null) {
+ SnapshotRef branch = base.ref(targetBranch);
+ if (branch != null) {
+ parentSnapshot = base.snapshot(branch.snapshotId());
+ } else if (base.currentSnapshot() != null) {
+ parentSnapshot = base.currentSnapshot();
+ }
+ }
- // run validations from the child operation
- validate(base);
+ long sequenceNumber = base.nextSequenceNumber();
+ Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();
- List<ManifestFile> manifests = apply(base);
+ validate(base, parentSnapshot);
+ List<ManifestFile> manifests = apply(base, parentSnapshot);
if (base.formatVersion() > 1
|| base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
@@ -337,11 +361,11 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
- update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
+ update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
- update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
+ update.setBranchSnapshot(newSnapshot, targetBranch);
}
TableMetadata updated = update.build();
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index c04a20b98b..508c90255e 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -487,4 +487,54 @@ public class TestFastAppend extends TableTestBase {
table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP);
Assert.assertEquals("Should set changed partition count", "2", changedPartitions);
}
+
+ @Test
+ public void testAppendToExistingBranch() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+ table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
+ int branchSnapshot = 2;
+
+ Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
+ Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
+ }
+
+ @Test
+ public void testAppendCreatesBranchIfNeeded() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
+ int branchSnapshot = 2;
+
+ Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
+ Assert.assertNotNull(table.ops().current().ref("branch"));
+ Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
+ }
+
+ @Test
+ public void testAppendToBranchEmptyTable() {
+ table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
+ int branchSnapshot = 1;
+
+ Assert.assertNull(table.currentSnapshot());
+ Assert.assertNotNull(table.ops().current().ref("branch"));
+ Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
+ }
+
+ @Test
+ public void testAppendToNullBranchFails() {
+ AssertHelpers.assertThrows(
+ "Invalid branch",
+ IllegalArgumentException.class,
+ () -> table.newFastAppend().appendFile(FILE_A).toBranch(null));
+ }
+
+ @Test
+ public void testAppendToTagFails() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.manageSnapshots().createTag("some-tag", table.currentSnapshot().snapshotId()).commit();
+ AssertHelpers.assertThrows(
+ "Invalid branch",
+ IllegalArgumentException.class,
+ () -> table.newFastAppend().appendFile(FILE_A).toBranch("some-tag").commit());
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
index f7788fbe32..082f6396bd 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
@@ -300,4 +300,17 @@ public class TestOverwrite extends TableTestBase {
Assert.assertEquals(
"Should not create a new snapshot", baseId, table.currentSnapshot().snapshotId());
}
+
+ @Test
+ public void testOverwriteToBranchUnsupported() {
+ AssertHelpers.assertThrows(
+ "Cannot commit to branch someBranch: org.apache.iceberg.BaseOverwriteFiles does not support branch commits",
+ UnsupportedOperationException.class,
+ () ->
+ table
+ .newOverwrite()
+ .overwriteByRowFilter(and(equal("date", "2018-06-09"), lessThan("id", 20)))
+ .addFile(FILE_10_TO_14)
+ .toBranch("someBranch"));
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
index 9c73d2d957..d5007bf6de 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
@@ -742,4 +742,13 @@ public class TestReplacePartitions extends TableTestBase {
public void testEmptyPartitionPathWithUnpartitionedTable() {
DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath("");
}
+
+ @Test
+ public void testReplacePartitionsOnBranchUnsupported() {
+ AssertHelpers.assertThrows(
+ "Should reject committing rewrite manifests to branch",
+ UnsupportedOperationException.class,
+ "Cannot commit to branch someBranch: org.apache.iceberg.BaseReplacePartitions does not support branch commits",
+ () -> table.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch("someBranch"));
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 175c80c4d1..633b27241d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -1088,6 +1088,20 @@ public class TestRewriteManifests extends TableTestBase {
Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists());
}
+ @Test
+ public void testRewriteManifestsOnBranchUnsupported() {
+
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ Assert.assertEquals(1, table.currentSnapshot().allManifests(table.io()).size());
+
+ AssertHelpers.assertThrows(
+ "Should reject committing rewrite manifests to branch",
+ UnsupportedOperationException.class,
+ "Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits",
+ () -> table.rewriteManifests().toBranch("someBranch").commit());
+ }
+
private void validateSummary(
Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
Map<String, String> summary = snapshot.summary();
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 6ebb92eb86..e2929b4709 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -1412,4 +1412,20 @@ public class TestRowDelta extends V2TableTestBase {
.validateNoConflictingDeleteFiles()
.commit());
}
+
+ @Test
+ public void testRowDeltaToBranchUnsupported() {
+ AssertHelpers.assertThrows(
+ "Should reject committing row delta to branch",
+ UnsupportedOperationException.class,
+ "Cannot commit to branch someBranch: org.apache.iceberg.BaseRowDelta does not support branch commits",
+ () ->
+ table
+ .newRowDelta()
+ .caseSensitive(false)
+ .addRows(FILE_B)
+ .addDeletes(FILE_A2_DELETES)
+ .toBranch("someBranch")
+ .commit());
+ }
}