You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/02 16:57:23 UTC
[iceberg] branch master updated: Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new aaefa6669c Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)
aaefa6669c is described below
commit aaefa6669c98150dba0758bed02008f2a577427a
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Nov 2 09:57:17 2022 -0700
Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)
---
spark/v3.2/build.gradle | 5 +-
.../RowLevelCommandDynamicPruning.scala | 56 ++++++-----
.../SparkRowLevelOperationsTestBase.java | 36 +++++++
.../spark/extensions/TestCopyOnWriteDelete.java | 103 ++++++++++++++++++++
.../spark/extensions/TestCopyOnWriteMerge.java | 108 +++++++++++++++++++++
.../spark/extensions/TestCopyOnWriteUpdate.java | 100 +++++++++++++++++++
.../iceberg/spark/extensions/TestDelete.java | 80 +++++++++------
.../apache/iceberg/spark/extensions/TestMerge.java | 57 ++++++++++-
.../iceberg/spark/extensions/TestUpdate.java | 56 ++++++++++-
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 14 +++
10 files changed, 552 insertions(+), 63 deletions(-)
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 604d8786e1..13628a7606 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -151,14 +151,15 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
exclude group: 'org.roaringbitmap'
}
+ testImplementation project(path: ':iceberg-data')
+ testImplementation project(path: ':iceberg-parquet')
testImplementation project(path: ':iceberg-hive-metastore')
- testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
-
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
testImplementation "org.apache.avro:avro"
+ testImplementation "org.apache.parquet:parquet-hadoop"
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly "org.antlr:antlr4-runtime:4.8"
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
index 0685c88dfc..4b257920b6 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
@@ -48,7 +48,9 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.trees.TreePattern.SORT
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
import scala.collection.compat.immutable.ArraySeq
/**
@@ -59,6 +61,8 @@ import scala.collection.compat.immutable.ArraySeq
*/
case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[LogicalPlan] with PredicateHelper {
+ import ExtendedDataSourceV2Implicits._
+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// apply special dynamic filtering only for plans that don't support deltas
case RewrittenRowLevelCommand(
@@ -69,16 +73,25 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
// use reference equality to find exactly the required scan relations
val newRewritePlan = rewritePlan transformUp {
case r: DataSourceV2ScanRelation if r.scan eq scan =>
- val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](
- ArraySeq.unsafeWrapArray(scan.filterAttributes), r)
- val dynamicPruningCond = buildDynamicPruningCondition(r, command, pruningKeys)
- val filter = Filter(dynamicPruningCond, r)
- // always optimize dynamic filtering subqueries for row-level commands as it is important
- // to rewrite introduced predicates as joins because Spark recently stopped optimizing
- // dynamic subqueries to facilitate broadcast reuse
- optimizeSubquery(filter)
+ // use the original table instance that was loaded for this row-level operation
+ // in order to leverage a regular batch scan in the group filter query
+ val originalTable = r.relation.table.asRowLevelOperationTable.table
+ val relation = r.relation.copy(table = originalTable)
+ val matchingRowsPlan = buildMatchingRowsPlan(relation, command)
+
+ val filterAttrs = ArraySeq.unsafeWrapArray(scan.filterAttributes)
+ val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+ val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+ val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
+
+ Filter(dynamicPruningCond, r)
}
- command.withNewRewritePlan(newRewritePlan)
+
+ // always optimize dynamic filtering subqueries for row-level commands as it is important
+ // to rewrite introduced predicates as joins because Spark recently stopped optimizing
+ // dynamic subqueries to facilitate broadcast reuse
+ command.withNewRewritePlan(optimizeSubquery(newRewritePlan))
+
}
private def isCandidate(command: RowLevelCommand): Boolean = command.condition match {
@@ -86,10 +99,9 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
case _ => false
}
- private def buildDynamicPruningCondition(
- relation: DataSourceV2ScanRelation,
- command: RowLevelCommand,
- pruningKeys: Seq[Attribute]): Expression = {
+ private def buildMatchingRowsPlan(
+ relation: DataSourceV2Relation,
+ command: RowLevelCommand): LogicalPlan = {
// construct a filtering plan with the original scan relation
val matchingRowsPlan = command match {
@@ -113,23 +125,23 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
}
// clone the original relation in the filtering plan and assign new expr IDs to avoid conflicts
- val transformedMatchingRowsPlan = matchingRowsPlan transformUpWithNewOutput {
- case r: DataSourceV2ScanRelation if r eq relation =>
+ matchingRowsPlan transformUpWithNewOutput {
+ case r: DataSourceV2Relation if r eq relation =>
val oldOutput = r.output
val newOutput = oldOutput.map(_.newInstance())
r.copy(output = newOutput) -> oldOutput.zip(newOutput)
}
+ }
+
+ private def buildDynamicPruningCond(
+ matchingRowsPlan: LogicalPlan,
+ buildKeys: Seq[Attribute],
+ pruningKeys: Seq[Attribute]): Expression = {
- val filterableScan = relation.scan.asInstanceOf[SupportsRuntimeFiltering]
- val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](
- ArraySeq.unsafeWrapArray(filterableScan.filterAttributes),
- transformedMatchingRowsPlan)
- val buildQuery = Project(buildKeys, transformedMatchingRowsPlan)
+ val buildQuery = Project(buildKeys, matchingRowsPlan)
val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) =>
DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)
}
-
- // combine all dynamic subqueries to produce the final condition
dynamicPruningSubqueries.reduce(And)
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index eafd968d01..633b2ee431 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -31,6 +31,8 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -38,7 +40,15 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
@@ -275,4 +285,30 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
throw new RuntimeException(e);
}
}
+
+ protected DataFile writeDataFile(Table table, List<GenericRecord> records) {
+ try {
+ OutputFile file = Files.localOutput(temp.newFile());
+
+ DataWriter<GenericRecord> dataWriter =
+ Parquet.writeData(file)
+ .forTable(table)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .build();
+
+ try {
+ for (GenericRecord record : records) {
+ dataWriter.write(record);
+ }
+ } finally {
+ dataWriter.close();
+ }
+
+ return dataWriter.toDataFile();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index ec9c559851..22dbd9df5e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -18,10 +18,32 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
+
+import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
public class TestCopyOnWriteDelete extends TestDelete {
@@ -40,4 +62,85 @@ public class TestCopyOnWriteDelete extends TestDelete {
return ImmutableMap.of(
TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+
+ @Test
+ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception {
+ // this test can only be run with Hive tables as it requires a reliable lock
+ // also, the table cache must be enabled so that the same table instance can be reused
+ Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+ createAndInitUnpartitionedTable();
+ createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+ tableName, DELETE_ISOLATION_LEVEL, "snapshot");
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ ExecutorService executorService =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+ // delete thread
+ Future<?> deleteFuture =
+ executorService.submit(
+ () -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture =
+ executorService.submit(
+ () -> {
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ Assertions.assertThatThrownBy(deleteFuture::get)
+ .isInstanceOf(ExecutionException.class)
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("the table has been concurrently modified");
+ } finally {
+ shouldAppend.set(false);
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+ }
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 5608e1eeab..ebb445d255 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -18,10 +18,32 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+
+import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
public class TestCopyOnWriteMerge extends TestMerge {
@@ -40,4 +62,90 @@ public class TestCopyOnWriteMerge extends TestMerge {
return ImmutableMap.of(
TableProperties.MERGE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+
+ @Test
+ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception {
+ // this test can only be run with Hive tables as it requires a reliable lock
+ // also, the table cache must be enabled so that the same table instance can be reused
+ Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+ createAndInitTable("id INT, dep STRING");
+ createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+ tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ ExecutorService executorService =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+ // merge thread
+ Future<?> mergeFuture =
+ executorService.submit(
+ () -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET dep = 'x'",
+ tableName);
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture =
+ executorService.submit(
+ () -> {
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ Assertions.assertThatThrownBy(mergeFuture::get)
+ .isInstanceOf(ExecutionException.class)
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("the table has been concurrently modified");
+ } finally {
+ shouldAppend.set(false);
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+ }
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index 5a81b0bbd5..b8c7bab788 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -18,10 +18,30 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
+
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
public class TestCopyOnWriteUpdate extends TestUpdate {
@@ -40,4 +60,84 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
return ImmutableMap.of(
TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+
+ @Test
+ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception {
+ // this test can only be run with Hive tables as it requires a reliable lock
+ // also, the table cache must be enabled so that the same table instance can be reused
+ Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+ createAndInitTable("id INT, dep STRING");
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+ tableName, UPDATE_ISOLATION_LEVEL, "snapshot");
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ ExecutorService executorService =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+ // update thread
+ Future<?> updateFuture =
+ executorService.submit(
+ () -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture =
+ executorService.submit(
+ () -> {
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ Assertions.assertThatThrownBy(updateFuture::get)
+ .isInstanceOf(ExecutionException.class)
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("the table has been concurrently modified");
+ } finally {
+ shouldAppend.set(false);
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+ }
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index c516ac7c96..fb0d8fdab2 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -27,6 +27,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,12 +37,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -722,30 +725,20 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
createAndInitUnpartitionedTable();
+ createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, DELETE_ISOLATION_LEVEL, "serializable");
- // Pre-populate the table to force it to use the Spark Writers instead of Metadata-Only Delete
- // for more consistent exception stack
- List<Integer> ids = ImmutableList.of(1, 2);
- Dataset<Row> inputDF =
- spark
- .createDataset(ids, Encoders.INT())
- .withColumnRenamed("value", "id")
- .withColumn("dep", lit("hr"));
- try {
- inputDF.coalesce(1).writeTo(tableName).append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
- }
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// delete thread
Future<?> deleteFuture =
@@ -755,7 +748,9 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("DELETE FROM %s WHERE id = 1", tableName);
+
+ sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
barrier.incrementAndGet();
}
});
@@ -764,15 +759,26 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
+ // load the table via the validation catalog to use another table instance
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- try {
- inputDF.coalesce(1).writeTo(tableName).append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
}
barrier.incrementAndGet();
@@ -788,6 +794,7 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
@@ -802,16 +809,20 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
createAndInitUnpartitionedTable();
+ createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, DELETE_ISOLATION_LEVEL, "snapshot");
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// delete thread
Future<?> deleteFuture =
@@ -821,7 +832,9 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("DELETE FROM %s WHERE id = 1", tableName);
+
+ sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
barrier.incrementAndGet();
}
});
@@ -830,22 +843,26 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
- List<Integer> ids = ImmutableList.of(1, 2);
- Dataset<Row> inputDF =
- spark
- .createDataset(ids, Encoders.INT())
- .withColumnRenamed("value", "id")
- .withColumn("dep", lit("hr"));
+ // load the table via the validation catalog to use another table instance for inserts
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
for (int numOperations = 0; numOperations < 20; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- try {
- inputDF.coalesce(1).writeTo(tableName).append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
}
barrier.incrementAndGet();
@@ -855,6 +872,7 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
try {
deleteFuture.get();
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index d820b7c3db..58fbb6241e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -34,12 +34,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -989,11 +992,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, MERGE_ISOLATION_LEVEL, "serializable");
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// merge thread
Future<?> mergeFuture =
@@ -1003,12 +1009,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
+
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET dep = 'x'",
tableName);
+
barrier.incrementAndGet();
}
});
@@ -1017,11 +1025,28 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
+ // load the table via the validation catalog to use another table instance
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
barrier.incrementAndGet();
}
});
@@ -1035,6 +1060,7 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
@@ -1055,11 +1081,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// merge thread
Future<?> mergeFuture =
@@ -1069,12 +1098,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
+
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET dep = 'x'",
tableName);
+
barrier.incrementAndGet();
}
});
@@ -1083,11 +1114,28 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
+ // load the table via the validation catalog to use another table instance for inserts
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
for (int numOperations = 0; numOperations < 20; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
barrier.incrementAndGet();
}
});
@@ -1095,6 +1143,7 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
try {
mergeFuture.get();
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 4748c998ea..eb6ffbe710 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -46,6 +47,7 @@ import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -439,11 +441,14 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, UPDATE_ISOLATION_LEVEL, "serializable");
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// update thread
Future<?> updateFuture =
@@ -453,7 +458,9 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
+
sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
barrier.incrementAndGet();
}
});
@@ -462,11 +469,28 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
+ // load the table via the validation catalog to use another table instance
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
barrier.incrementAndGet();
}
});
@@ -480,6 +504,7 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
@@ -499,11 +524,14 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, UPDATE_ISOLATION_LEVEL, "snapshot");
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
+ AtomicBoolean shouldAppend = new AtomicBoolean(true);
// update thread
Future<?> updateFuture =
@@ -513,7 +541,9 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
+
sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
barrier.incrementAndGet();
}
});
@@ -522,11 +552,28 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
Future<?> appendFuture =
executorService.submit(
() -> {
+ // load the table via the validation catalog to use another table instance for inserts
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ record.set(0, 1); // id
+ record.set(1, "hr"); // dep
+
for (int numOperations = 0; numOperations < 20; numOperations++) {
- while (barrier.get() < numOperations * 2) {
+ while (shouldAppend.get() && barrier.get() < numOperations * 2) {
sleep(10);
}
- sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+ if (!shouldAppend.get()) {
+ return;
+ }
+
+ for (int numAppends = 0; numAppends < 5; numAppends++) {
+ DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+ table.newFastAppend().appendFile(dataFile).commit();
+ sleep(10);
+ }
+
barrier.incrementAndGet();
}
});
@@ -534,6 +581,7 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
try {
updateFuture.get();
} finally {
+ shouldAppend.set(false);
appendFuture.cancel(true);
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 4efd5180e2..a13a09f995 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
@@ -102,6 +103,14 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
@Override
public void filter(Filter[] filters) {
+ Preconditions.checkState(
+ Objects.equals(snapshotId(), currentSnapshotId()),
+ "Runtime file filtering is not possible: the table has been concurrently modified. "
+ + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. "
+ + "If multiple threads modify the table, use independent Spark sessions in each thread.",
+ snapshotId(),
+ currentSnapshotId());
+
for (Filter filter : filters) {
// Spark can only pass In filters at the moment
if (filter instanceof In
@@ -191,4 +200,9 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
"IcebergCopyOnWriteScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
table(), expectedSchema().asStruct(), filterExpressions(), caseSensitive());
}
+
+ private Long currentSnapshotId() {
+ Snapshot currentSnapshot = table().currentSnapshot();
+ return currentSnapshot != null ? currentSnapshot.snapshotId() : null;
+ }
}