You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/02 16:57:23 UTC

[iceberg] branch master updated: Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new aaefa6669c Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)
aaefa6669c is described below

commit aaefa6669c98150dba0758bed02008f2a577427a
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Nov 2 09:57:17 2022 -0700

    Spark 3.2: Use separate scan during file filtering in copy-on-write ops (#6095)
---
 spark/v3.2/build.gradle                            |   5 +-
 .../RowLevelCommandDynamicPruning.scala            |  56 ++++++-----
 .../SparkRowLevelOperationsTestBase.java           |  36 +++++++
 .../spark/extensions/TestCopyOnWriteDelete.java    | 103 ++++++++++++++++++++
 .../spark/extensions/TestCopyOnWriteMerge.java     | 108 +++++++++++++++++++++
 .../spark/extensions/TestCopyOnWriteUpdate.java    | 100 +++++++++++++++++++
 .../iceberg/spark/extensions/TestDelete.java       |  80 +++++++++------
 .../apache/iceberg/spark/extensions/TestMerge.java |  57 ++++++++++-
 .../iceberg/spark/extensions/TestUpdate.java       |  56 ++++++++++-
 .../iceberg/spark/source/SparkCopyOnWriteScan.java |  14 +++
 10 files changed, 552 insertions(+), 63 deletions(-)

diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 604d8786e1..13628a7606 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -151,14 +151,15 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
       exclude group: 'org.roaringbitmap'
     }
 
+    testImplementation project(path: ':iceberg-data')
+    testImplementation project(path: ':iceberg-parquet')
     testImplementation project(path: ':iceberg-hive-metastore')
-    testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
-
     testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
     testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
 
     testImplementation "org.apache.avro:avro"
+    testImplementation "org.apache.parquet:parquet-hadoop"
 
     // Required because we remove antlr plugin dependencies from the compile configuration, see note above
     runtimeOnly "org.antlr:antlr4-runtime:4.8"
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
index 0685c88dfc..4b257920b6 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
@@ -48,7 +48,9 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.catalyst.trees.TreePattern.SORT
 import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
 import scala.collection.compat.immutable.ArraySeq
 
 /**
@@ -59,6 +61,8 @@ import scala.collection.compat.immutable.ArraySeq
  */
 case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[LogicalPlan] with PredicateHelper {
 
+  import ExtendedDataSourceV2Implicits._
+
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     // apply special dynamic filtering only for plans that don't support deltas
     case RewrittenRowLevelCommand(
@@ -69,16 +73,25 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
       // use reference equality to find exactly the required scan relations
       val newRewritePlan = rewritePlan transformUp {
         case r: DataSourceV2ScanRelation if r.scan eq scan =>
-          val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](
-            ArraySeq.unsafeWrapArray(scan.filterAttributes), r)
-          val dynamicPruningCond = buildDynamicPruningCondition(r, command, pruningKeys)
-          val filter = Filter(dynamicPruningCond, r)
-          // always optimize dynamic filtering subqueries for row-level commands as it is important
-          // to rewrite introduced predicates as joins because Spark recently stopped optimizing
-          // dynamic subqueries to facilitate broadcast reuse
-          optimizeSubquery(filter)
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)
+          val matchingRowsPlan = buildMatchingRowsPlan(relation, command)
+
+          val filterAttrs = ArraySeq.unsafeWrapArray(scan.filterAttributes)
+          val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+          val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+          val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
+
+          Filter(dynamicPruningCond, r)
       }
-      command.withNewRewritePlan(newRewritePlan)
+
+      // always optimize dynamic filtering subqueries for row-level commands as it is important
+      // to rewrite introduced predicates as joins because Spark recently stopped optimizing
+      // dynamic subqueries to facilitate broadcast reuse
+      command.withNewRewritePlan(optimizeSubquery(newRewritePlan))
+
   }
 
   private def isCandidate(command: RowLevelCommand): Boolean = command.condition match {
@@ -86,10 +99,9 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
     case _ => false
   }
 
-  private def buildDynamicPruningCondition(
-      relation: DataSourceV2ScanRelation,
-      command: RowLevelCommand,
-      pruningKeys: Seq[Attribute]): Expression = {
+  private def buildMatchingRowsPlan(
+      relation: DataSourceV2Relation,
+      command: RowLevelCommand): LogicalPlan = {
 
     // construct a filtering plan with the original scan relation
     val matchingRowsPlan = command match {
@@ -113,23 +125,23 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
     }
 
     // clone the original relation in the filtering plan and assign new expr IDs to avoid conflicts
-    val transformedMatchingRowsPlan = matchingRowsPlan transformUpWithNewOutput {
-      case r: DataSourceV2ScanRelation if r eq relation =>
+    matchingRowsPlan transformUpWithNewOutput {
+      case r: DataSourceV2Relation if r eq relation =>
         val oldOutput = r.output
         val newOutput = oldOutput.map(_.newInstance())
         r.copy(output = newOutput) -> oldOutput.zip(newOutput)
     }
+  }
+
+  private def buildDynamicPruningCond(
+      matchingRowsPlan: LogicalPlan,
+      buildKeys: Seq[Attribute],
+      pruningKeys: Seq[Attribute]): Expression = {
 
-    val filterableScan = relation.scan.asInstanceOf[SupportsRuntimeFiltering]
-    val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](
-      ArraySeq.unsafeWrapArray(filterableScan.filterAttributes),
-      transformedMatchingRowsPlan)
-    val buildQuery = Project(buildKeys, transformedMatchingRowsPlan)
+    val buildQuery = Project(buildKeys, matchingRowsPlan)
     val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) =>
       DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)
     }
-
-    // combine all dynamic subqueries to produce the final condition
     dynamicPruningSubqueries.reduce(And)
   }
 
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index eafd968d01..633b2ee431 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -31,6 +31,8 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +40,15 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkSessionCatalog;
@@ -275,4 +285,30 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
       throw new RuntimeException(e);
     }
   }
+
+  protected DataFile writeDataFile(Table table, List<GenericRecord> records) {
+    try {
+      OutputFile file = Files.localOutput(temp.newFile());
+
+      DataWriter<GenericRecord> dataWriter =
+          Parquet.writeData(file)
+              .forTable(table)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .overwrite()
+              .build();
+
+      try {
+        for (GenericRecord record : records) {
+          dataWriter.write(record);
+        }
+      } finally {
+        dataWriter.close();
+      }
+
+      return dataWriter.toDataFile();
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index ec9c559851..22dbd9df5e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -18,10 +18,32 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
+
+import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
 
 public class TestCopyOnWriteDelete extends TestDelete {
 
@@ -40,4 +62,85 @@ public class TestCopyOnWriteDelete extends TestDelete {
     return ImmutableMap.of(
         TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
+
+  @Test
+  public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception {
+    // this test can only be run with Hive tables as it requires a reliable lock
+    // also, the table cache must be enabled so that the same table instance can be reused
+    Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+    createAndInitUnpartitionedTable();
+    createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+        tableName, DELETE_ISOLATION_LEVEL, "snapshot");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    ExecutorService executorService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+    // delete thread
+    Future<?> deleteFuture =
+        executorService.submit(
+            () -> {
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    // append thread
+    Future<?> appendFuture =
+        executorService.submit(
+            () -> {
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    try {
+      Assertions.assertThatThrownBy(deleteFuture::get)
+          .isInstanceOf(ExecutionException.class)
+          .cause()
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessageContaining("the table has been concurrently modified");
+    } finally {
+      shouldAppend.set(false);
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 5608e1eeab..ebb445d255 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -18,10 +18,32 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+
+import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
 
 public class TestCopyOnWriteMerge extends TestMerge {
 
@@ -40,4 +62,90 @@ public class TestCopyOnWriteMerge extends TestMerge {
     return ImmutableMap.of(
         TableProperties.MERGE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
+
+  @Test
+  public synchronized void testMergeWithConcurrentTableRefresh() throws Exception {
+    // this test can only be run with Hive tables as it requires a reliable lock
+    // also, the table cache must be enabled so that the same table instance can be reused
+    Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+        tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    ExecutorService executorService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+    // merge thread
+    Future<?> mergeFuture =
+        executorService.submit(
+            () -> {
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.value "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET dep = 'x'",
+                    tableName);
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    // append thread
+    Future<?> appendFuture =
+        executorService.submit(
+            () -> {
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    try {
+      Assertions.assertThatThrownBy(mergeFuture::get)
+          .isInstanceOf(ExecutionException.class)
+          .cause()
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessageContaining("the table has been concurrently modified");
+    } finally {
+      shouldAppend.set(false);
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index 5a81b0bbd5..b8c7bab788 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -18,10 +18,30 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
+
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.spark.Spark3Util;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
 
 public class TestCopyOnWriteUpdate extends TestUpdate {
 
@@ -40,4 +60,84 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
     return ImmutableMap.of(
         TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
+
+  @Test
+  public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception {
+    // this test can only be run with Hive tables as it requires a reliable lock
+    // also, the table cache must be enabled so that the same table instance can be reused
+    Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive"));
+
+    createAndInitTable("id INT, dep STRING");
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+        tableName, UPDATE_ISOLATION_LEVEL, "snapshot");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    ExecutorService executorService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
+
+    // update thread
+    Future<?> updateFuture =
+        executorService.submit(
+            () -> {
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    // append thread
+    Future<?> appendFuture =
+        executorService.submit(
+            () -> {
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
+              for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
+                  sleep(10);
+                }
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
+                barrier.incrementAndGet();
+              }
+            });
+
+    try {
+      Assertions.assertThatThrownBy(updateFuture::get)
+          .isInstanceOf(ExecutionException.class)
+          .cause()
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessageContaining("the table has been concurrently modified");
+    } finally {
+      shouldAppend.set(false);
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index c516ac7c96..fb0d8fdab2 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -27,6 +27,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.spark.sql.functions.lit;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,12 +37,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -722,30 +725,20 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
 
     createAndInitUnpartitionedTable();
+    createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
 
     sql(
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, DELETE_ISOLATION_LEVEL, "serializable");
 
-    // Pre-populate the table to force it to use the Spark Writers instead of Metadata-Only Delete
-    // for more consistent exception stack
-    List<Integer> ids = ImmutableList.of(1, 2);
-    Dataset<Row> inputDF =
-        spark
-            .createDataset(ids, Encoders.INT())
-            .withColumnRenamed("value", "id")
-            .withColumn("dep", lit("hr"));
-    try {
-      inputDF.coalesce(1).writeTo(tableName).append();
-    } catch (NoSuchTableException e) {
-      throw new RuntimeException(e);
-    }
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
 
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // delete thread
     Future<?> deleteFuture =
@@ -755,7 +748,9 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("DELETE FROM %s WHERE id = 1", tableName);
+
+                sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -764,15 +759,26 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              // load the table via the validation catalog to use another table instance
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
               for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
 
-                try {
-                  inputDF.coalesce(1).writeTo(tableName).append();
-                } catch (NoSuchTableException e) {
-                  throw new RuntimeException(e);
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
                 }
 
                 barrier.incrementAndGet();
@@ -788,6 +794,7 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
           .isInstanceOf(ValidationException.class)
           .hasMessageContaining("Found conflicting files that can contain");
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
@@ -802,16 +809,20 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
 
     createAndInitUnpartitionedTable();
+    createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT());
 
     sql(
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, DELETE_ISOLATION_LEVEL, "snapshot");
 
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // delete thread
     Future<?> deleteFuture =
@@ -821,7 +832,9 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("DELETE FROM %s WHERE id = 1", tableName);
+
+                sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -830,22 +843,26 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
-              List<Integer> ids = ImmutableList.of(1, 2);
-              Dataset<Row> inputDF =
-                  spark
-                      .createDataset(ids, Encoders.INT())
-                      .withColumnRenamed("value", "id")
-                      .withColumn("dep", lit("hr"));
+              // load the table via the validation catalog to use another table instance for inserts
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
 
               for (int numOperations = 0; numOperations < 20; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
 
-                try {
-                  inputDF.coalesce(1).writeTo(tableName).append();
-                } catch (NoSuchTableException e) {
-                  throw new RuntimeException(e);
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
                 }
 
                 barrier.incrementAndGet();
@@ -855,6 +872,7 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     try {
       deleteFuture.get();
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index d820b7c3db..58fbb6241e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -34,12 +34,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -989,11 +992,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, MERGE_ISOLATION_LEVEL, "serializable");
 
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // merge thread
     Future<?> mergeFuture =
@@ -1003,12 +1009,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
+
                 sql(
                     "MERGE INTO %s t USING source s "
                         + "ON t.id == s.value "
                         + "WHEN MATCHED THEN "
                         + "  UPDATE SET dep = 'x'",
                     tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -1017,11 +1025,28 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              // load the table via the validation catalog to use another table instance
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
               for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
                 barrier.incrementAndGet();
               }
             });
@@ -1035,6 +1060,7 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
           .isInstanceOf(ValidationException.class)
           .hasMessageContaining("Found conflicting files that can contain");
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
@@ -1055,11 +1081,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, MERGE_ISOLATION_LEVEL, "snapshot");
 
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // merge thread
     Future<?> mergeFuture =
@@ -1069,12 +1098,14 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
+
                 sql(
                     "MERGE INTO %s t USING source s "
                         + "ON t.id == s.value "
                         + "WHEN MATCHED THEN "
                         + "  UPDATE SET dep = 'x'",
                     tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -1083,11 +1114,28 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              // load the table via the validation catalog to use another table instance for inserts
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
               for (int numOperations = 0; numOperations < 20; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
                 barrier.incrementAndGet();
               }
             });
@@ -1095,6 +1143,7 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
     try {
       mergeFuture.get();
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 4748c998ea..eb6ffbe710 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -46,6 +47,7 @@ import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -439,11 +441,14 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, UPDATE_ISOLATION_LEVEL, "serializable");
 
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // update thread
     Future<?> updateFuture =
@@ -453,7 +458,9 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
+
                 sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -462,11 +469,28 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              // load the table via the validation catalog to use another table instance
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
               for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
                 barrier.incrementAndGet();
               }
             });
@@ -480,6 +504,7 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
           .isInstanceOf(ValidationException.class)
           .hasMessageContaining("Found conflicting files that can contain");
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
@@ -499,11 +524,14 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, UPDATE_ISOLATION_LEVEL, "snapshot");
 
+    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
     ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
             (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
     AtomicInteger barrier = new AtomicInteger(0);
+    AtomicBoolean shouldAppend = new AtomicBoolean(true);
 
     // update thread
     Future<?> updateFuture =
@@ -513,7 +541,9 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
+
                 sql("UPDATE %s SET id = -1 WHERE id = 1", tableName);
+
                 barrier.incrementAndGet();
               }
             });
@@ -522,11 +552,28 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              // load the table via the validation catalog to use another table instance for inserts
+              Table table = validationCatalog.loadTable(tableIdent);
+
+              GenericRecord record = GenericRecord.create(table.schema());
+              record.set(0, 1); // id
+              record.set(1, "hr"); // dep
+
               for (int numOperations = 0; numOperations < 20; numOperations++) {
-                while (barrier.get() < numOperations * 2) {
+                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
-                sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+
+                if (!shouldAppend.get()) {
+                  return;
+                }
+
+                for (int numAppends = 0; numAppends < 5; numAppends++) {
+                  DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
+                  table.newFastAppend().appendFile(dataFile).commit();
+                  sleep(10);
+                }
+
                 barrier.incrementAndGet();
               }
             });
@@ -534,6 +581,7 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
     try {
       updateFuture.get();
     } finally {
+      shouldAppend.set(false);
       appendFuture.cancel(true);
     }
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 4efd5180e2..a13a09f995 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.TableScan;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkReadConf;
@@ -102,6 +103,14 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
 
   @Override
   public void filter(Filter[] filters) {
+    Preconditions.checkState(
+        Objects.equals(snapshotId(), currentSnapshotId()),
+        "Runtime file filtering is not possible: the table has been concurrently modified. "
+            + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. "
+            + "If multiple threads modify the table, use independent Spark sessions in each thread.",
+        snapshotId(),
+        currentSnapshotId());
+
     for (Filter filter : filters) {
       // Spark can only pass In filters at the moment
       if (filter instanceof In
@@ -191,4 +200,9 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
         "IcebergCopyOnWriteScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
         table(), expectedSchema().asStruct(), filterExpressions(), caseSensitive());
   }
+
+  private Long currentSnapshotId() {
+    Snapshot currentSnapshot = table().currentSnapshot();
+    return currentSnapshot != null ? currentSnapshot.snapshotId() : null;
+  }
 }