You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/08 21:00:43 UTC

[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2925: Core: Support serializable isolation for ReplacePartitions

aokolnychyi commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r801909857



##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -20,19 +20,25 @@
 package org.apache.iceberg;
 
 /**
- * Not recommended: API for overwriting files in a table by partition.
+ * API for overwriting files in a table by partition.
  * <p>
  * This is provided to implement SQL compatible with Hive table operations but is not recommended.
  * Instead, use the {@link OverwriteFiles overwrite API} to explicitly overwrite data.
  * <p>
+ * The default validation mode is idempotent, meaning the overwrite is
+ * correct and should be committed out regardless of other concurrent changes to the table.
+ * Alternatively, this API can be configured to validate that no new data or deletes
+ * have been applied since a snapshot ID associated when this operation began.
+ * This can be done by calling {@link #validateNoConflictingDeletes()}
+ * and {@link #validateNoConflictingData()}, and will ensure that no conflicting delta files or data files

Review comment:
       nit: `delta` -> `delete`?

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +55,42 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in validations for this operation.
+   *
+   * All validations will check changes after this snapshot ID. If this is not called, validation will occur
+   * from the current snapshot ID upon creation of this object.
+   *
+   * This method should be called before this operation is committed.
+   * If a concurrent operation committed a data file or row delta after the given snapshot ID
+   * that might contain rows matching a partition marked for deletion, validation will detect this and fail.
+   *
+   * @param snapshotId a snapshot ID, it should be set to when this operation started to read the table.
+   * @return this for method chaining
+   */
+  ReplacePartitions validateFromSnapshot(long snapshotId);
+
+

Review comment:
       nit: extra new line

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -399,6 +438,35 @@ protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapsho
         dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
   }
 
+  /**
+   * Validates that no delete files matching a partition set have been added to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the operation
+   * @param partitionSet partition set to match against delete files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) {

Review comment:
       Same here?

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +60,24 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
+    this.startingSnapshotId = newStartingSnapshotId;
+    return this;
+  }
+
+  @Override
+  public void validate(TableMetadata currentMetadata) {
+    if (startingSnapshotId != null && startingSnapshotId != 0) {
+      if (dataSpec().isUnpartitioned()) {

Review comment:
       I probably agree with this given the commit behavior right now.

##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -333,6 +334,7 @@ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
     private Expression partitionFilter = Expressions.alwaysTrue();
     private boolean caseSensitive = true;
     private ExecutorService executorService = null;
+    private PartitionSet partitionSet = null;

Review comment:
       nit: would it make sense to co-locate filtering methods and vars next to each other?
   For example, we already have `filterPartitions` that accepts an expression. We could place vars and methods next to each other you did in `ManifestReader`. It is usually a good idea to co-locate overloaded methods.

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -22,12 +22,22 @@
 import java.util.List;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.util.PartitionSet;
 
 public class BaseReplacePartitions
     extends MergingSnapshotProducer<ReplacePartitions> implements ReplacePartitions {
+
+  private final PartitionSet replacedPartitions;
+  private Long startingSnapshotId;
+  private boolean validateNewDataFiles = false;
+  private boolean validateNewDeleteFiles = false;
+
   BaseReplacePartitions(String tableName, TableOperations ops) {
     super(tableName, ops);
     set(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true");
+    replacedPartitions = PartitionSet.create(ops.current().specsById());
+    this.startingSnapshotId = ops.current().currentSnapshot() == null ? 0 :

Review comment:
       I am not sure about defaulting the start snapshot ID. We don't do that in `OverwriteFiles`. I guess I see very little value given that we create an instance of this class immediately before calling `commit` in query engines. I'd probably prefer the behavior we have in `OverwriteFiles` for consistency. It means validate all snapshots if not set. Users are familiar with that.

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -263,6 +264,44 @@ private ManifestFile copyManifest(ManifestFile manifest) {
         current.formatVersion(), toCopy, current.specsById(), newManifestPath, snapshotId(), appendedManifestsSummary);
   }
 
+  /**
+   * Validates that no files matching given partitions have been added to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the operation
+   * @param partitionSet a set of partitions to check against
+   */
+  protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) {

Review comment:
       This method seems really similar to the existing `validateAddedDataFiles`. Can we adapt and overload that one? You can call `filterManifestEntries` multiple times and that gets AND'ed in `ManifestGroup`.

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -225,6 +225,9 @@ private TableProperties() {
   public static final String MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep";
   public static final int MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1;
 
+  public static final String DYNAMIC_OVERWRITE_ISOLATION_LEVEL = "write.dynamic.overwrite.isolation-level";

Review comment:
       What I mean is that we can use Dataset options for this. We anyway need to pass the snapshot ID that was scanned to make it work and the only way to do that is through options. Even if we have a property, we still need to set the option. I'd probably use options to begin with. I can be convinced otherwise, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org