You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/02 16:41:02 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #1862: Spark: Implement copy-on-write DELETE

aokolnychyi opened a new pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862


   This PR adds the Iceberg part to support copy-on-write DELETE in Spark.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mehtaashish23 commented on pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
mehtaashish23 commented on pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#issuecomment-739001324


   Thanks @aokolnychyi for the great work! This has been an amazing building block for subsequent work for UPSERTs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535045057



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {
+    String mode = icebergTable.properties().getOrDefault(WRITE_ROW_LEVEL_MODE, WRITE_ROW_LEVEL_MODE_DEFAULT);
+    ValidationException.check(mode.equals("copy-on-write"), "Unsupported row operations mode: %s", mode);
+    return new SparkMergeBuilder(sparkSession(), icebergTable, info);
+  }
+
+  @Override
+  public boolean canDeleteWhere(Filter[] filters) {
+    if (table().specs().size() > 1) {
+      // cannot guarantee a metadata delete will be successful if we have multiple specs
+      return false;
+    }
+
+    Set<Integer> identitySourceIds = table().spec().identitySourceIds();
+    Schema schema = table().schema();
+
+    for (Filter filter : filters) {
+      // return false if the filter requires rewrite or if we cannot translate the filter
+      if (requiresRewrite(filter, schema, identitySourceIds) || SparkFilters.convert(filter) == null) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer> identitySourceIds) {
+    // TODO: handle dots correctly via v2references

Review comment:
       Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534636335



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {
+    String mode = icebergTable.properties().getOrDefault(WRITE_ROW_LEVEL_MODE, WRITE_ROW_LEVEL_MODE_DEFAULT);
+    ValidationException.check(mode.equals("copy-on-write"), "Unsupported row operations mode: %s", mode);
+    return new SparkMergeBuilder(sparkSession(), icebergTable, info);
+  }
+
+  @Override
+  public boolean canDeleteWhere(Filter[] filters) {
+    if (table().specs().size() > 1) {
+      // cannot guarantee a metadata delete will be successful if we have multiple specs
+      return false;
+    }
+
+    Set<Integer> identitySourceIds = table().spec().identitySourceIds();
+    Schema schema = table().schema();
+
+    for (Filter filter : filters) {
+      // return false if the filter requires rewrite or if we cannot translate the filter
+      if (requiresRewrite(filter, schema, identitySourceIds) || SparkFilters.convert(filter) == null) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer> identitySourceIds) {
+    // TODO: handle dots correctly via v2references

Review comment:
       We may want to add another TODO to detect more cases that don't require rewrites.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534633796



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkMergeBuilder implements MergeBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final LogicalWriteInfo writeInfo;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkMergeBuilder(SparkSession spark, Table table, LogicalWriteInfo writeInfo) {
+    this.spark = spark;
+    this.table = table;
+    this.writeInfo = writeInfo;
+
+    String isolationLevelAsString = table.properties().getOrDefault(
+        TableProperties.WRITE_ISOLATION_LEVEL,
+        TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
+    ).toUpperCase(Locale.ROOT);
+    this.isolationLevel = IsolationLevel.valueOf(isolationLevelAsString);
+  }
+
+  @Override
+  public ScanBuilder asScanBuilder() {
+    return scanBuilder();
+  }
+
+  private ScanBuilder scanBuilder() {
+    if (lazyScanBuilder == null) {
+      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, scanOptions()) {
+        public Scan build() {
+          Scan scan = super.buildMergeScan();
+          SparkMergeBuilder.this.configuredScan = scan;
+          return scan;
+        }
+      };
+      // ignore residuals to ensure we read full files
+      lazyScanBuilder = scanBuilder.ignoreResiduals();
+    }
+
+    return lazyScanBuilder;
+  }
+
+  private CaseInsensitiveStringMap scanOptions() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      return CaseInsensitiveStringMap.empty();

Review comment:
       Shouldn't this return `writeInfo.options()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534485965



##########
File path: api/src/main/java/org/apache/iceberg/IsolationLevel.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * An isolation level in a table.
+ * <p>
+ * Two isolation levels are supported: serializable and snapshot isolation. Both of them provide
+ * a read consistent view of the table to all operations and allow readers to see only already
+ * committed data. While serializable is the strongest isolation level in databases,
+ * snapshot isolation is beneficial for environments with many concurrent writers.
+ * <p>
+ * The serializable isolation level guarantees that an ongoing UPDATE/DELETE/MERGE operation
+ * fails if a concurrent transaction commits a new file that might contain rows matching
+ * the condition used in UPDATE/DELETE/MERGE. For example, if there is an ongoing update
+ * on a subset of rows and a concurrent transaction adds a new file with records
+ * that potentially match the update condition, the update operation must fail under
+ * the serializable isolation but can still commit under the snapshot isolation.
+ */
+public enum IsolationLevel {

Review comment:
       Does this need to be in `api`? I think it is only used to parse a string and hold the result. For now, I would move it to core to avoid exposing it too widely.
   
   FYI @jacques-n: isolation level for a single table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535479044



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       We have this info only in `RewriteDelete` rule that calls `newMergeBuilder`. After that, we have no way to know whether it is a delete or merge.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535480503



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       Unless we want to have `newMergeBuilder`, `newDeleteBuilder`, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535046623



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {
+    String mode = icebergTable.properties().getOrDefault(WRITE_ROW_LEVEL_MODE, WRITE_ROW_LEVEL_MODE_DEFAULT);
+    ValidationException.check(mode.equals("copy-on-write"), "Unsupported row operations mode: %s", mode);
+    return new SparkMergeBuilder(sparkSession(), icebergTable, info);
+  }
+
+  @Override
+  public boolean canDeleteWhere(Filter[] filters) {
+    if (table().specs().size() > 1) {
+      // cannot guarantee a metadata delete will be successful if we have multiple specs
+      return false;
+    }
+
+    Set<Integer> identitySourceIds = table().spec().identitySourceIds();
+    Schema schema = table().schema();
+
+    for (Filter filter : filters) {
+      // return false if the filter requires rewrite or if we cannot translate the filter
+      if (requiresRewrite(filter, schema, identitySourceIds) || SparkFilters.convert(filter) == null) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer> identitySourceIds) {
+    // TODO: handle dots correctly via v2references

Review comment:
       Yeah, they are already supported.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r536132987



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -140,4 +140,10 @@ private TableProperties() {
 
   public static final String GC_ENABLED = "gc.enabled";
   public static final boolean GC_ENABLED_DEFAULT = true;
+
+  public static final String WRITE_ISOLATION_LEVEL = "write.isolation-level";

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535553287



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       After discussing this directly, I think the right way to go is to add the operation name. That fixes the isolation level and mode property problem. And it is reasonable to add the operation to `LogicalWriteInfo`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r536133582



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {
+    String mode = icebergTable.properties().getOrDefault(WRITE_ROW_LEVEL_MODE, WRITE_ROW_LEVEL_MODE_DEFAULT);
+    ValidationException.check(mode.equals("copy-on-write"), "Unsupported row operations mode: %s", mode);
+    return new SparkMergeBuilder(sparkSession(), icebergTable, info);
+  }
+
+  @Override
+  public boolean canDeleteWhere(Filter[] filters) {
+    if (table().specs().size() > 1) {
+      // cannot guarantee a metadata delete will be successful if we have multiple specs
+      return false;
+    }
+
+    Set<Integer> identitySourceIds = table().spec().identitySourceIds();
+    Schema schema = table().schema();
+
+    for (Filter filter : filters) {
+      // return false if the filter requires rewrite or if we cannot translate the filter
+      if (requiresRewrite(filter, schema, identitySourceIds) || SparkFilters.convert(filter) == null) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer> identitySourceIds) {
+    // TODO: handle dots correctly via v2references

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535479833



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       So if we pass something that tells us what logical operation it is, we can have `write.delete.isolation-level`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#issuecomment-737352396


   @rdblue, here is the Iceberg part.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535445470



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       I'm not a fan of passing things through Spark's options. If we have the information in the writer, can we just call a setter to set it on the reader?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535421320



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -140,4 +140,10 @@ private TableProperties() {
 
   public static final String GC_ENABLED = "gc.enabled";
   public static final boolean GC_ENABLED_DEFAULT = true;
+
+  public static final String WRITE_ISOLATION_LEVEL = "write.isolation-level";

Review comment:
       It might be better to use `write.row-level.isolation-level` so it matches the mode property. This doesn't apply to `INSERT OVERWRITE` so it would be nice to be clear about it.
   
   Not sure if we can make that property name better. Seems odd to have `row-level` and `isolation-level` next to one another since "row-level" is specifying the granularity of the changes and "isolation-level" is actually choosing an option.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535539404



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       I like that this is used for both MERGE and DELETE without needing to customize, so I'm reluctant to do too much here. It is close, but I think I would opt to have a single config property.
   
   Operation is something that we could pass through `LogicalWriteInfo` in the future for better logging and purposes like this one. For now, we could add the operation as a String passed to `newMergeBuilder`, but there is no guarantee that we would add this to `LogicalWriteInfo` later. I think that means that we should go for a single property and we can customize later if we do get the operation when the builder is created.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534316604



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -140,4 +140,10 @@ private TableProperties() {
 
   public static final String GC_ENABLED = "gc.enabled";
   public static final boolean GC_ENABLED_DEFAULT = true;
+
+  public static final String WRITE_ISOLATION_LEVEL = "write.isolation-level";
+  public static final String WRITE_ISOLATION_LEVEL_DEFAULT = "serializable";
+
+  public static final String WRITE_ROW_LEVEL_MODE = "write.row-level.mode";

Review comment:
       Same here. I wanted it to be `write.delete.mode`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535540871



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkMergeBuilder implements MergeBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final LogicalWriteInfo writeInfo;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkMergeBuilder(SparkSession spark, Table table, LogicalWriteInfo writeInfo) {
+    this.spark = spark;
+    this.table = table;
+    this.writeInfo = writeInfo;
+
+    String isolationLevelAsString = table.properties().getOrDefault(
+        TableProperties.WRITE_ISOLATION_LEVEL,
+        TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
+    ).toUpperCase(Locale.ROOT);
+    this.isolationLevel = IsolationLevel.valueOf(isolationLevelAsString);
+  }
+
+  @Override
+  public ScanBuilder asScanBuilder() {
+    return scanBuilder();
+  }
+
+  private ScanBuilder scanBuilder() {
+    if (lazyScanBuilder == null) {
+      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, scanOptions()) {
+        public Scan build() {
+          Scan scan = super.buildMergeScan();
+          SparkMergeBuilder.this.configuredScan = scan;
+          return scan;
+        }
+      };
+      // ignore residuals to ensure we read full files
+      lazyScanBuilder = scanBuilder.ignoreResiduals();
+    }
+
+    return lazyScanBuilder;
+  }
+
+  private CaseInsensitiveStringMap scanOptions() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      return CaseInsensitiveStringMap.empty();

Review comment:
       Now that I'm thinking about this more, why choose a snapshot here and pass it to the scan? The scan could determine the snapshot itself without modifying these options. The writer gets the snapshot ID from the scan anyway, so it doesn't matter whether it is determined here or in the scan.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535043116



##########
File path: api/src/main/java/org/apache/iceberg/IsolationLevel.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * An isolation level in a table.
+ * <p>
+ * Two isolation levels are supported: serializable and snapshot isolation. Both of them provide
+ * a read consistent view of the table to all operations and allow readers to see only already
+ * committed data. While serializable is the strongest isolation level in databases,
+ * snapshot isolation is beneficial for environments with many concurrent writers.
+ * <p>
+ * The serializable isolation level guarantees that an ongoing UPDATE/DELETE/MERGE operation
+ * fails if a concurrent transaction commits a new file that might contain rows matching
+ * the condition used in UPDATE/DELETE/MERGE. For example, if there is an ongoing update
+ * on a subset of rows and a concurrent transaction adds a new file with records
+ * that potentially match the update condition, the update operation must fail under
+ * the serializable isolation but can still commit under the snapshot isolation.
+ */
+public enum IsolationLevel {

Review comment:
       Sound good to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r536133177



##########
File path: api/src/main/java/org/apache/iceberg/IsolationLevel.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * An isolation level in a table.
+ * <p>
+ * Two isolation levels are supported: serializable and snapshot isolation. Both of them provide
+ * a read consistent view of the table to all operations and allow readers to see only already
+ * committed data. While serializable is the strongest isolation level in databases,
+ * snapshot isolation is beneficial for environments with many concurrent writers.
+ * <p>
+ * The serializable isolation level guarantees that an ongoing UPDATE/DELETE/MERGE operation
+ * fails if a concurrent transaction commits a new file that might contain rows matching
+ * the condition used in UPDATE/DELETE/MERGE. For example, if there is an ongoing update
+ * on a subset of rows and a concurrent transaction adds a new file with records
+ * that potentially match the update condition, the update operation must fail under
+ * the serializable isolation but can still commit under the snapshot isolation.
+ */
+public enum IsolationLevel {

Review comment:
       Moved.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkMergeBuilder implements MergeBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final LogicalWriteInfo writeInfo;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkMergeBuilder(SparkSession spark, Table table, LogicalWriteInfo writeInfo) {
+    this.spark = spark;
+    this.table = table;
+    this.writeInfo = writeInfo;
+
+    String isolationLevelAsString = table.properties().getOrDefault(
+        TableProperties.WRITE_ISOLATION_LEVEL,
+        TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
+    ).toUpperCase(Locale.ROOT);
+    this.isolationLevel = IsolationLevel.valueOf(isolationLevelAsString);
+  }
+
+  @Override
+  public ScanBuilder asScanBuilder() {
+    return scanBuilder();
+  }
+
+  private ScanBuilder scanBuilder() {
+    if (lazyScanBuilder == null) {
+      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, scanOptions()) {
+        public Scan build() {
+          Scan scan = super.buildMergeScan();
+          SparkMergeBuilder.this.configuredScan = scan;
+          return scan;
+        }
+      };
+      // ignore residuals to ensure we read full files
+      lazyScanBuilder = scanBuilder.ignoreResiduals();
+    }
+
+    return lazyScanBuilder;
+  }
+
+  private CaseInsensitiveStringMap scanOptions() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      return CaseInsensitiveStringMap.empty();

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535041450



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       We could set `operation` inside `info.options()` in `RewriteDelete` and pass it here so that we can support table properties like `write.delete.mode` instead of `write.row-level.mode`. Thoughts, @rdblue? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r536133861



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r535044073



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkMergeBuilder implements MergeBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final LogicalWriteInfo writeInfo;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkMergeBuilder(SparkSession spark, Table table, LogicalWriteInfo writeInfo) {
+    this.spark = spark;
+    this.table = table;
+    this.writeInfo = writeInfo;
+
+    String isolationLevelAsString = table.properties().getOrDefault(
+        TableProperties.WRITE_ISOLATION_LEVEL,
+        TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
+    ).toUpperCase(Locale.ROOT);
+    this.isolationLevel = IsolationLevel.valueOf(isolationLevelAsString);
+  }
+
+  @Override
+  public ScanBuilder asScanBuilder() {
+    return scanBuilder();
+  }
+
+  private ScanBuilder scanBuilder() {
+    if (lazyScanBuilder == null) {
+      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, scanOptions()) {
+        public Scan build() {
+          Scan scan = super.buildMergeScan();
+          SparkMergeBuilder.this.configuredScan = scan;
+          return scan;
+        }
+      };
+      // ignore residuals to ensure we read full files
+      lazyScanBuilder = scanBuilder.ignoreResiduals();
+    }
+
+    return lazyScanBuilder;
+  }
+
+  private CaseInsensitiveStringMap scanOptions() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      return CaseInsensitiveStringMap.empty();

Review comment:
       It should, plus we need to validate no `snapshot-id` is set in options.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkMergeBuilder implements MergeBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final LogicalWriteInfo writeInfo;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkMergeBuilder(SparkSession spark, Table table, LogicalWriteInfo writeInfo) {
+    this.spark = spark;
+    this.table = table;
+    this.writeInfo = writeInfo;
+
+    String isolationLevelAsString = table.properties().getOrDefault(
+        TableProperties.WRITE_ISOLATION_LEVEL,
+        TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
+    ).toUpperCase(Locale.ROOT);
+    this.isolationLevel = IsolationLevel.valueOf(isolationLevelAsString);
+  }
+
+  @Override
+  public ScanBuilder asScanBuilder() {
+    return scanBuilder();
+  }
+
+  private ScanBuilder scanBuilder() {
+    if (lazyScanBuilder == null) {
+      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, scanOptions()) {
+        public Scan build() {
+          Scan scan = super.buildMergeScan();
+          SparkMergeBuilder.this.configuredScan = scan;
+          return scan;
+        }
+      };
+      // ignore residuals to ensure we read full files
+      lazyScanBuilder = scanBuilder.ignoreResiduals();
+    }
+
+    return lazyScanBuilder;
+  }
+
+  private CaseInsensitiveStringMap scanOptions() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      return CaseInsensitiveStringMap.empty();

Review comment:
       It should, plus we need to validate no `snapshot-id` is set in options. Good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534315817



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -140,4 +140,10 @@ private TableProperties() {
 
   public static final String GC_ENABLED = "gc.enabled";
   public static final boolean GC_ENABLED_DEFAULT = true;
+
+  public static final String WRITE_ISOLATION_LEVEL = "write.isolation-level";

Review comment:
       Well, I wanted it to be `write.delete.isolation-level` but we have no way to find out which operation is being performed in the merge builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1862: Spark: Implement copy-on-write DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1862:
URL: https://github.com/apache/iceberg/pull/1862#discussion_r534634975



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -160,6 +167,43 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     return new SparkWriteBuilder(sparkSession(), icebergTable, info);
   }
 
+  @Override
+  public MergeBuilder newMergeBuilder(LogicalWriteInfo info) {
+    String mode = icebergTable.properties().getOrDefault(WRITE_ROW_LEVEL_MODE, WRITE_ROW_LEVEL_MODE_DEFAULT);
+    ValidationException.check(mode.equals("copy-on-write"), "Unsupported row operations mode: %s", mode);
+    return new SparkMergeBuilder(sparkSession(), icebergTable, info);
+  }
+
+  @Override
+  public boolean canDeleteWhere(Filter[] filters) {
+    if (table().specs().size() > 1) {
+      // cannot guarantee a metadata delete will be successful if we have multiple specs
+      return false;
+    }
+
+    Set<Integer> identitySourceIds = table().spec().identitySourceIds();
+    Schema schema = table().schema();
+
+    for (Filter filter : filters) {
+      // return false if the filter requires rewrite or if we cannot translate the filter
+      if (requiresRewrite(filter, schema, identitySourceIds) || SparkFilters.convert(filter) == null) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean requiresRewrite(Filter filter, Schema schema, Set<Integer> identitySourceIds) {
+    // TODO: handle dots correctly via v2references

Review comment:
       Are v2 references committed in Spark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org