You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/30 17:54:36 UTC
[incubator-iceberg] branch spark-3 updated: Add DeleteFrom support
for Spark 3.0 (#706)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/spark-3 by this push:
new bafeb5c Add DeleteFrom support for Spark 3.0 (#706)
bafeb5c is described below
commit bafeb5cd387abb686a3a226384f30850a0b5111b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Dec 30 09:54:28 2019 -0800
Add DeleteFrom support for Spark 3.0 (#706)
---
.../java/org/apache/iceberg/spark/SparkFilters.java | 4 +++-
.../org/apache/iceberg/spark/source/SparkTable.java | 21 ++++++++++++++++++++-
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index ae1b88f..b6db70d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -87,7 +87,9 @@ public class SparkFilters {
public static Expression convert(Filter[] filters) {
Expression expression = Expressions.alwaysTrue();
for (Filter filter : filters) {
- expression = Expressions.and(expression, convert(filter));
+ Expression converted = convert(filter);
+ Preconditions.checkArgument(converted != null, "Cannot convert filter to Iceberg: %s", filter);
+ expression = Expressions.and(expression, converted);
}
return expression;
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 09fb304..9cba49f 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -23,19 +23,25 @@ import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+public class SparkTable
+ implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite, SupportsDelete {
private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
TableCapability.BATCH_READ,
@@ -120,6 +126,19 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
}
@Override
+ public void deleteWhere(Filter[] filters) {
+ Expression deleteExpr = SparkFilters.convert(filters);
+ try {
+ icebergTable.newDelete()
+ .set("spark.app.id", sparkSession().sparkContext().applicationId())
+ .deleteFromRowFilter(deleteExpr)
+ .commit();
+ } catch (ValidationException e) {
+ throw new IllegalArgumentException("Failed to cleanly delete data files matching: " + deleteExpr, e);
+ }
+ }
+
+ @Override
public String toString() {
return icebergTable.toString();
}