You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/23 00:59:59 UTC

[iceberg] branch master updated: Spark 3.2: Optimize DELETEs handled using metadata (#6912)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bda35ce7cf Spark 3.2: Optimize DELETEs handled using metadata (#6912)
bda35ce7cf is described below

commit bda35ce7cf5f618679b886a651bd91c2ab7ad141
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Feb 22 16:59:53 2023 -0800

    Spark 3.2: Optimize DELETEs handled using metadata (#6912)
    
    This change backports PR #6899 into Spark 3.2.
---
 .../iceberg/spark/extensions/TestDelete.java       | 50 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkReadConf.java    |  2 +-
 .../java/org/apache/iceberg/spark/SparkUtil.java   |  4 ++
 .../iceberg/spark/actions/SparkZOrderStrategy.java |  3 +-
 .../apache/iceberg/spark/source/SparkTable.java    | 12 ++++--
 5 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fb0d8fdab2..c46445b31d 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -39,8 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -59,6 +61,10 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
@@ -91,6 +97,42 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS deleted_dep");
   }
 
+  @Test
+  public void testDeleteWithoutScanningTable() throws Exception {
+    createAndInitPartitionedTable();
+
+    append(new Employee(1, "hr"), new Employee(3, "hr"));
+    append(new Employee(1, "hardware"), new Employee(2, "hardware"));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    List<String> manifestLocations =
+        table.currentSnapshot().allManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toList());
+
+    withUnavailableLocations(
+        manifestLocations,
+        () -> {
+          LogicalPlan parsed = parsePlan("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+          DeleteFromIcebergTable analyzed =
+              (DeleteFromIcebergTable) spark.sessionState().analyzer().execute(parsed);
+          Assert.assertTrue("Should have rewrite plan", analyzed.rewritePlan().isDefined());
+
+          DeleteFromIcebergTable optimized =
+              (DeleteFromIcebergTable) OptimizeMetadataOnlyDeleteFromTable.apply(analyzed);
+          Assert.assertTrue("Should discard rewrite plan", optimized.rewritePlan().isEmpty());
+        });
+
+    sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1, "hardware"), row(2, "hardware")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
   @Test
   public void testDeleteFileThenMetadataDelete() throws Exception {
     Assume.assumeFalse("Avro does not support metadata delete", fileFormat.equals("avro"));
@@ -991,4 +1033,12 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     String modeName = table.properties().getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
     return RowLevelOperationMode.fromName(modeName);
   }
+
+  private LogicalPlan parsePlan(String query, Object... args) {
+    try {
+      return spark.sessionState().sqlParser().parsePlan(String.format(query, args));
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 6b81393259..8fbd11c9d9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -58,7 +58,7 @@ public class SparkReadConf {
   }
 
   public boolean caseSensitive() {
-    return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+    return SparkUtil.caseSensitive(spark);
   }
 
   public boolean localityEnabled() {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 950ed7bc87..84dbd7986a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -287,4 +287,8 @@ public class SparkUtil {
 
     return filterExpressions;
   }
+
+  public static boolean caseSensitive(SparkSession spark) {
+    return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index eed212a145..022d144b85 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
@@ -154,7 +155,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
   }
 
   private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) {
-    boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+    boolean caseSensitive = SparkUtil.caseSensitive(spark);
     Schema schema = table.schema();
     colNames.forEach(
         col -> {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 35da565af6..dc8227d8ad 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.StrictMetricsEvaluator;
@@ -50,6 +51,7 @@ import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
@@ -266,13 +268,17 @@ public class SparkTable
       }
     }
 
-    return deleteExpr == Expressions.alwaysTrue() || canDeleteUsingMetadata(deleteExpr);
+    return canDeleteUsingMetadata(deleteExpr);
   }
 
   // a metadata delete is possible iff matching files can be deleted entirely
   private boolean canDeleteUsingMetadata(Expression deleteExpr) {
-    boolean caseSensitive =
-        Boolean.parseBoolean(sparkSession().conf().get("spark.sql.caseSensitive"));
+    boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
+
+    if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
+      return true;
+    }
+
     TableScan scan =
         table()
             .newScan()