You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/30 17:54:36 UTC

[incubator-iceberg] branch spark-3 updated: Add DeleteFrom support for Spark 3.0 (#706)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/spark-3 by this push:
     new bafeb5c  Add DeleteFrom support for Spark 3.0 (#706)
bafeb5c is described below

commit bafeb5cd387abb686a3a226384f30850a0b5111b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Dec 30 09:54:28 2019 -0800

    Add DeleteFrom support for Spark 3.0 (#706)
---
 .../java/org/apache/iceberg/spark/SparkFilters.java |  4 +++-
 .../org/apache/iceberg/spark/source/SparkTable.java | 21 ++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index ae1b88f..b6db70d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -87,7 +87,9 @@ public class SparkFilters {
   public static Expression convert(Filter[] filters) {
     Expression expression = Expressions.alwaysTrue();
     for (Filter filter : filters) {
-      expression = Expressions.and(expression, convert(filter));
+      Expression converted = convert(filter);
+      Preconditions.checkArgument(converted != null, "Cannot convert filter to Iceberg: %s", filter);
+      expression = Expressions.and(expression, converted);
     }
     return expression;
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 09fb304..9cba49f 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -23,19 +23,25 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+public class SparkTable
+    implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite, SupportsDelete {
 
   private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
       TableCapability.BATCH_READ,
@@ -120,6 +126,19 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
   }
 
   @Override
+  public void deleteWhere(Filter[] filters) {
+    Expression deleteExpr = SparkFilters.convert(filters);
+    try {
+      icebergTable.newDelete()
+          .set("spark.app.id", sparkSession().sparkContext().applicationId())
+          .deleteFromRowFilter(deleteExpr)
+          .commit();
+    } catch (ValidationException e) {
+      throw new IllegalArgumentException("Failed to cleanly delete data files matching: " + deleteExpr, e);
+    }
+  }
+
+  @Override
   public String toString() {
     return icebergTable.toString();
   }