You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/23 00:59:59 UTC
[iceberg] branch master updated: Spark 3.2: Optimize DELETEs handled using metadata (#6912)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bda35ce7cf Spark 3.2: Optimize DELETEs handled using metadata (#6912)
bda35ce7cf is described below
commit bda35ce7cf5f618679b886a651bd91c2ab7ad141
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Feb 22 16:59:53 2023 -0800
Spark 3.2: Optimize DELETEs handled using metadata (#6912)
This change backports PR #6899 into Spark 3.2.
---
.../iceberg/spark/extensions/TestDelete.java | 50 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkReadConf.java | 2 +-
.../java/org/apache/iceberg/spark/SparkUtil.java | 4 ++
.../iceberg/spark/actions/SparkZOrderStrategy.java | 3 +-
.../apache/iceberg/spark/source/SparkTable.java | 12 ++++--
5 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fb0d8fdab2..c46445b31d 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -39,8 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -59,6 +61,10 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
@@ -91,6 +97,42 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
sql("DROP TABLE IF EXISTS deleted_dep");
}
+ @Test
+ public void testDeleteWithoutScanningTable() throws Exception {
+ createAndInitPartitionedTable();
+
+ append(new Employee(1, "hr"), new Employee(3, "hr"));
+ append(new Employee(1, "hardware"), new Employee(2, "hardware"));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ List<String> manifestLocations =
+ table.currentSnapshot().allManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toList());
+
+ withUnavailableLocations(
+ manifestLocations,
+ () -> {
+ LogicalPlan parsed = parsePlan("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+ DeleteFromIcebergTable analyzed =
+ (DeleteFromIcebergTable) spark.sessionState().analyzer().execute(parsed);
+ Assert.assertTrue("Should have rewrite plan", analyzed.rewritePlan().isDefined());
+
+ DeleteFromIcebergTable optimized =
+ (DeleteFromIcebergTable) OptimizeMetadataOnlyDeleteFromTable.apply(analyzed);
+ Assert.assertTrue("Should discard rewrite plan", optimized.rewritePlan().isEmpty());
+ });
+
+ sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1, "hardware"), row(2, "hardware")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
@Test
public void testDeleteFileThenMetadataDelete() throws Exception {
Assume.assumeFalse("Avro does not support metadata delete", fileFormat.equals("avro"));
@@ -991,4 +1033,12 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
String modeName = table.properties().getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
return RowLevelOperationMode.fromName(modeName);
}
+
+ private LogicalPlan parsePlan(String query, Object... args) {
+ try {
+ return spark.sessionState().sqlParser().parsePlan(String.format(query, args));
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 6b81393259..8fbd11c9d9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -58,7 +58,7 @@ public class SparkReadConf {
}
public boolean caseSensitive() {
- return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ return SparkUtil.caseSensitive(spark);
}
public boolean localityEnabled() {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 950ed7bc87..84dbd7986a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -287,4 +287,8 @@ public class SparkUtil {
return filterExpressions;
}
+
+ public static boolean caseSensitive(SparkSession spark) {
+ return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index eed212a145..022d144b85 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
@@ -154,7 +155,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
}
private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) {
- boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ boolean caseSensitive = SparkUtil.caseSensitive(spark);
Schema schema = table.schema();
colNames.forEach(
col -> {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 35da565af6..dc8227d8ad 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
@@ -50,6 +51,7 @@ import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
@@ -266,13 +268,17 @@ public class SparkTable
}
}
- return deleteExpr == Expressions.alwaysTrue() || canDeleteUsingMetadata(deleteExpr);
+ return canDeleteUsingMetadata(deleteExpr);
}
// a metadata delete is possible iff matching files can be deleted entirely
private boolean canDeleteUsingMetadata(Expression deleteExpr) {
- boolean caseSensitive =
- Boolean.parseBoolean(sparkSession().conf().get("spark.sql.caseSensitive"));
+ boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
+
+ if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
+ return true;
+ }
+
TableScan scan =
table()
.newScan()