You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/15 21:30:09 UTC

[GitHub] [iceberg] singhpk234 opened a new pull request, #5056: Spark: Support Spark 3.3

singhpk234 opened a new pull request, #5056:
URL: https://github.com/apache/iceberg/pull/5056

   Closes https://github.com/apache/iceberg/issues/4713
   
   - based on https://github.com/singhpk234/iceberg/pull/1
   
   ---- 
   ### About Change
   
   -  Introduced a new Node `ReplaceIcebergData` to be used in place of `ReplaceData`, as it extends command and commands are eagerly executed, which was causing problems with `Update` (in case we allow allowScanDuplication) & `RowLevelCommandDynamicPruning` (filters not being injected)
   - Changed the Rule name `RewriteDeleteFromTable` -> `RewriteDeleteFromIcebergTable` to avoid conflicting rule in upstream which doesn't support DeltaWrites and uses ReplaceData
   
   
   -----
   ### Other Notes : 
   
   Have marked some UT's ignored for complete suite to run: 
   [1] `TestAddFilesProcedure` failed due to bug in spark upstream, I have fixed it in upstream but since the snapshot is last based of 7th May it still failing hence ignored it.
   JIRA : https://issues.apache.org/jira/browse/SPARK-39417
   [2] Investing the RC for the below failures : 
       TestMerge#testMergeWithInvalidAssignments
       TestDeleteFrom#testDeleteFromUnpartitionedTable, TestDeleteFrom#testDeleteFromPartitionedTable
   
   ----
   
   cc @rdblue @aokolnychyi @jackye1995 @RussellSpitzer @kbendick 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#discussion_r899896356


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -0,0 +1,1816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.spark.sql.functions.lit;
+
+public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
+
+  public TestMerge(String catalogName, String implementation, Map<String, String> config,
+                   String fileFormat, boolean vectorized, String distributionMode) {
+    super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
+  }
+
+  @BeforeClass
+  public static void setupSparkConf() {
+    spark.conf().set("spark.sql.shuffle.partitions", "4");
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS source");
+  }
+
+  @Test
+  public void testMergeWithStaticPredicatePushDown() {
+    createAndInitTable("id BIGINT, dep STRING");
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    // add a data file to the 'software' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"software\" }");
+
+    // add a data file to the 'hr' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snapshot = table.currentSnapshot();
+    String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
+    Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"dep\": \"finance\" }\n" +
+        "{ \"id\": 2, \"dep\": \"hardware\" }");
+
+    // remove the data file from the 'hr' partition to ensure it is not scanned
+    withUnavailableFiles(snapshot.addedFiles(table.io()), () -> {
+      // disable dynamic pruning and rely only on static predicate pushdown
+      withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
+        sql("MERGE INTO %s t USING source " +
+            "ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " +
+            "WHEN MATCHED AND source.id = 1 THEN " +
+            "  UPDATE SET dep = source.dep " +
+            "WHEN NOT MATCHED THEN " +
+            "  INSERT (dep, id) VALUES (source.dep, source.id)", tableName);
+      });
+    });
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1L, "finance"),  // updated
+        row(1L, "hr"),       // kept
+        row(2L, "hardware")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND (s.id >=2) THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-six\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"),   // updated
+        row(6, "emp-id-six")  // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-one") // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCauses() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCausesWithExplicitColumnSpecification() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET t.id = s.id, t.dep = s.dep " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT (t.id, t.dep) VALUES (s.id, s.dep)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceCTE() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 5, \"dep\": \"emp-id-6\" }");
+
+    sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) " +
+        "MERGE INTO %s AS t USING cte1 AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 2 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 3 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // updated
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceFromSetOps() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String derivedSource =
+        "SELECT * FROM source WHERE id = 2 " +
+        "UNION ALL " +
+        "SELECT * FROM source WHERE id = 1 OR id = 6";
+
+    sql("MERGE INTO %s AS t USING (%s) AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName, derivedSource);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.value = 2 THEN " +
+              "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceEnabledHashShuffleJoin() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id == s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id > s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActionsNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id > s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRow() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET * " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithUnconditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSingleConditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithIdentityTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithDaysTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, ts TIMESTAMP");
+      sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName, "id INT, ts TIMESTAMP",
+          "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
+
+      createOrReplaceView("source", "id INT, ts TIMESTAMP",
+          "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n" +
+          "{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "2001-01-01 00:00:00"), // updated
+          row(2, "2001-01-02 00:00:00")  // new
+      );
+      assertEquals("Should have expected rows",
+          expectedRows,
+          sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithBucketTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithTruncateTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeIntoPartitionedAndOrderedTable() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+      sql("ALTER TABLE %s WRITE ORDERED BY (id)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testSelfMerge() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSelfMergeWithCaching() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("CACHE TABLE %s", tableName);
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceAsSelfSubquery() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+    sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+        "ON t.id == s.value " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+      Assert.fail("Expected a validation exception");
+    } catch (ExecutionException e) {
+      Throwable sparkException = e.getCause();
+      Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+      Throwable validationException = sparkException.getCause();
+      Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+      String errMsg = validationException.getMessage();
+      Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public void testMergeWithExtraColumnsInSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+        "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // new
+        row(2, "v2"),   // kept
+        row(3, "v3"),   // new
+        row(4, "v4")    // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullsInTargetAndSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullSafeEquals() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id <=> source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1_1"), // updated
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullCondition() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id AND NULL " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(2, "v2_2")     // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullActionConditions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }\n" +
+        "{ \"id\": 3, \"v\": \"v3_3\" }");
+
+    // all conditions are NULL and will never match any rows
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows1 = ImmutableList.of(
+        row(1, "v1"), // kept
+        row(2, "v2")  // kept
+    );
+    assertEquals("Output should match", expectedRows1, sql("SELECT * FROM %s ORDER BY v", tableName));
+
+    // only the update and insert conditions are NULL
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows2 = ImmutableList.of(
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleMatchingActions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    // the order of match actions is important in this case
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // updated (also matches the delete cond but update is first)
+        row(2, "v2")    // kept (matches neither the update nor the delete cond)
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
+    Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
+
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
+
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    List<Integer> ids = Lists.newArrayListWithCapacity(200);
+    for (int id = 1; id <= 200; id++) {
+      ids.add(id);
+    }
+    Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
+        .withColumnRenamed("value", "id")
+        .withColumn("dep", lit("hr"));
+    df.coalesce(1).writeTo(tableName).append();
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+
+    // update a record from one of two row groups and copy over the second one
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.value " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET dep = 'x'", tableName);
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+  }
+
+  @Test
+  public void testMergeInsertOnly() {
+    createAndInitTable("id STRING, v STRING",
+        "{ \"id\": \"a\", \"v\": \"v1\" }\n" +
+        "{ \"id\": \"b\", \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": \"a\", \"v\": \"v1_1\" }\n" +
+        "{ \"id\": \"a\", \"v\": \"v1_2\" }\n" +
+        "{ \"id\": \"c\", \"v\": \"v3\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_1\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row("a", "v1"),   // kept
+        row("b", "v2"),   // kept
+        row("c", "v3"),   // new
+        row("d", "v4_1"), // new
+        row("d", "v4_2")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeInsertOnlyWithCondition() {
+    createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 21, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 22, \"is_new\": false }");
+
+    // validate assignments are reordered to match the table attrs
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND is_new = TRUE THEN " +
+        "  INSERT (v, id) VALUES (s.v + 100, s.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1),  // kept
+        row(2, 121) // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET b = c2, a = c1, t.id = source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, a, id) VALUES (c2, c1, id)", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeMixedCaseAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView(
+        "source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.iD == source.Id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET B = c2, A = c1, t.Id = source.ID " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, A, iD) VALUES (c2, c1, id)", tableName);
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1")),
+        sql("SELECT * FROM %s WHERE id = 1 ORDER BY id", tableName));
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeUpdatesNestedStructFields() {
+    createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>",
+        "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2 }");
+
+    // update primitive, array, map columns inside a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = source.c1, t.s.c2.a = array(-1, -2), t.s.c2.m = map('k', 'v')", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-2, row(ImmutableList.of(-1, -2), ImmutableMap.of("k", "v"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // set primitive, array, map columns to NULL (proper casts should be in place)
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = NULL, t.s.c2 = NULL", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(null, null))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // update all fields in a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = named_struct('c1', 100, 'c2', named_struct('a', array(1), 'm', map('x', 'y')))", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(100, row(ImmutableList.of(1), ImmutableMap.of("x", "y"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithInferredCasts() {
+    createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}");
+
+    // -2 in source should be casted to "-2" in target
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = source.c1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, "-2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeModifiesNullStruct() {
+    createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
+    createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }");
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.n1 = s.n1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-10, null))),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testMergeRefreshesRelationCache() {
+    createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
+
+    Dataset<Row> query = spark.sql("SELECT name FROM " + tableName);
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n1")),
+        sql("SELECT * FROM tmp"));
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.name = s.name", tableName);
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n2")),
+        sql("SELECT * FROM tmp"));
+
+    spark.sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testMergeWithMultipleNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2"),  // new
+        row(3, "emp-id-3")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleConditionalNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeResolvesColumnsByName() {
+    createAndInitTable("id INT, badge INT, dep STRING",
+        "{ \"id\": 1, \"badge\": 1000, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"badge\": 6000, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "badge INT, id INT, dep STRING",
+        "{ \"badge\": 1001, \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"badge\": 6006, \"id\": 6, \"dep\": \"emp-id-6\" }\n" +
+        "{ \"badge\": 7007, \"id\": 7, \"dep\": \"emp-id-7\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT * ", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1001, "emp-id-1"), // updated
+        row(6, 6006, "emp-id-6"), // updated
+        row(7, 7007, "emp-id-7")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows,
+        sql("SELECT id, badge, dep FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() {
+    // ensures that MERGE INTO will resolve into the correct action even if no columns
+    // or otherwise unresolved expressions exist in the query (testing SPARK-34962)
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON 1 != 1 " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNonExistingColumns() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve t.invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid nested column",
+        AnalysisException.class, "No such struct field invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, invalid_col) VALUES (s.c1, null)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidColumnsInInsert() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the nested column",
+        AnalysisException.class, "Nested fields are not supported inside INSERT clauses",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, c.n2) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about duplicate columns",
+        AnalysisException.class, "Duplicate column names inside INSERT clause",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, id) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about missing columns",
+        AnalysisException.class, "must provide values for all columns of the target table",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id) VALUES (s.c1)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidUpdates() {
+    createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about updating an array column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.a.c1 = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about updating a map column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.m.key = 'new_key'", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithConflictingUpdates() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a top-level column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict for these columns",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", tableName);
+        });
+  }
+
+  // revisit
+  @Ignore
+  @Test
+  public void testMergeWithInvalidAssignments() {
+    createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL");
+    createOrReplaceView(
+        "source",
+        "c1 INT, c2 STRUCT<n1:INT NOT NULL> NOT NULL, c3 STRING NOT NULL, c4 STRUCT<dn2:INT,dn1:INT>",
+        "{ \"c1\": -100, \"c2\": { \"n1\" : 1 }, \"c3\" : 'str', \"c4\": { \"dn2\": 1, \"dn2\": 2 } }");
+
+    for (String policy : new String[]{"ansi", "strict"}) {
+      withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy), () -> {
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a top-level column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.id = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a nested column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing missing fields in structs",
+            AnalysisException.class, "missing fields",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s = s.c2", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing invalid data types",
+            AnalysisException.class, "Cannot safely cast",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = s.c3", tableName);
+            });

Review Comment:
   These test were failing as the Analysis Exception is happening due to 
   https://github.com/apache/iceberg/blob/13db47e2755d8a885043a7ec62eee2dcab43a9bc/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala#L174-L176
   
   On debugging found that `s.c2` despite being `not null` by table definition has `nullable=true` when assignments are resolved. 
   
   This was happening as of spark 3.3 , due to a behaviour introduced in https://github.com/apache/spark/commit/6689b97ec76abe5bab27f02869f8f16b32530d1a, now spark by default will not respect nullability in schema provided via 
   
   ```
   `DataFrameReader.schema(schema).json(jsonDataset)`, `DataFrameReader.schema(schema).csv(csvDataset)` 
   ```
   
   added this conf true in https://github.com/apache/iceberg/blob/master/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
   
   as we use this in our test utils here we are using it to create our temp view



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 closed pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 closed pull request #5056: Spark: Support Spark 3.3
URL: https://github.com/apache/iceberg/pull/5056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1158641432

   UPDATE : 
   spark 3.3 yesterday got published on maven repo : https://mvnrepository.com/artifact/org.apache.spark/spark-core
   have removed the pr dependency on snapshot's repo
   [1] Have reverted the ignored AddFilesProcedure UT's as well, since the upstream now having fix is published.
   [2] Have reverted the Test#Merge ut failure as well, found spark hid the behaviour we were relying on behind a conf, have enabled in our session.
   
   
   I am investigating the RC for the two remaining UT's failure in `TestDeleteFrom`, will update the fixes the shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1169075208

   superceded by https://github.com/apache/iceberg/pull/5094
   
   Thank you everyone for your awesome reviews :) !!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#discussion_r900040404


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestDeleteFrom extends SparkCatalogTestBase {
+  public TestDeleteFrom(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  // revisit
+  @Ignore
+  @Test
+  public void testDeleteFromUnpartitionedTable() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    AssertHelpers.assertThrows("Should not delete when not all rows of a file match the filter",
+        AnalysisException.class, "Cannot delete from",
+        () -> sql("DELETE FROM %s WHERE id < 2", tableName));
+
+    sql("DELETE FROM %s WHERE id < 4", tableName);
+
+    assertEquals("Should have no rows after successful delete",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDeleteFromTableAtSnapshot() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+
+    long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    String prefix = "snapshot_id_";
+    AssertHelpers.assertThrows("Should not be able to delete from a table at a specific snapshot",
+        IllegalArgumentException.class, "Cannot delete from table at a specific snapshot",
+        () -> sql("DELETE FROM %s.%s WHERE id < 4", tableName, prefix + snapshotId));
+  }
+
+  // revisit
+  @Ignore
+  @Test

Review Comment:
   These test's were failing stating  : 
   ```
   Field _file not found in source schema
   java.lang.IllegalArgumentException: Field _file not found in source schema
   	at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:104)
   	at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
          ....
          	at org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:192)
   	at org.apache.iceberg.spark.source.SparkWriteBuilder.validateOrMergeWriteSchema(SparkWriteBuilder.java:247)
   ```
   
   `_file` not found this was happening because in the upstream we were relying on `_ _metadata_col` flag in metadata attribute to filter these metadata columns out : 
   https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L221-L225
   
   But when we were doing spark schema conversion we were not adding this in attribute metadata, and hence it was not getting filtered, when it was trying validating with source schema it was failing. 
   
   Added a fix for same, (added for [3.2](https://github.com/apache/iceberg/pull/5075) as well) and adapted the UT to respect the upstream changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
pan3793 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1157515292

   "copy 3.2 files from 3.3" should be in another PR, otherwise after squash, the git will not keep commit history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#discussion_r899896356


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -0,0 +1,1816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.spark.sql.functions.lit;
+
+public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
+
+  public TestMerge(String catalogName, String implementation, Map<String, String> config,
+                   String fileFormat, boolean vectorized, String distributionMode) {
+    super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
+  }
+
+  @BeforeClass
+  public static void setupSparkConf() {
+    spark.conf().set("spark.sql.shuffle.partitions", "4");
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS source");
+  }
+
+  @Test
+  public void testMergeWithStaticPredicatePushDown() {
+    createAndInitTable("id BIGINT, dep STRING");
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    // add a data file to the 'software' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"software\" }");
+
+    // add a data file to the 'hr' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snapshot = table.currentSnapshot();
+    String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
+    Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"dep\": \"finance\" }\n" +
+        "{ \"id\": 2, \"dep\": \"hardware\" }");
+
+    // remove the data file from the 'hr' partition to ensure it is not scanned
+    withUnavailableFiles(snapshot.addedFiles(table.io()), () -> {
+      // disable dynamic pruning and rely only on static predicate pushdown
+      withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
+        sql("MERGE INTO %s t USING source " +
+            "ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " +
+            "WHEN MATCHED AND source.id = 1 THEN " +
+            "  UPDATE SET dep = source.dep " +
+            "WHEN NOT MATCHED THEN " +
+            "  INSERT (dep, id) VALUES (source.dep, source.id)", tableName);
+      });
+    });
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1L, "finance"),  // updated
+        row(1L, "hr"),       // kept
+        row(2L, "hardware")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND (s.id >=2) THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-six\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"),   // updated
+        row(6, "emp-id-six")  // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-one") // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCauses() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCausesWithExplicitColumnSpecification() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET t.id = s.id, t.dep = s.dep " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT (t.id, t.dep) VALUES (s.id, s.dep)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceCTE() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 5, \"dep\": \"emp-id-6\" }");
+
+    sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) " +
+        "MERGE INTO %s AS t USING cte1 AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 2 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 3 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // updated
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceFromSetOps() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String derivedSource =
+        "SELECT * FROM source WHERE id = 2 " +
+        "UNION ALL " +
+        "SELECT * FROM source WHERE id = 1 OR id = 6";
+
+    sql("MERGE INTO %s AS t USING (%s) AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName, derivedSource);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.value = 2 THEN " +
+              "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceEnabledHashShuffleJoin() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id == s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id > s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActionsNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id > s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRow() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET * " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithUnconditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSingleConditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithIdentityTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithDaysTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, ts TIMESTAMP");
+      sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName, "id INT, ts TIMESTAMP",
+          "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
+
+      createOrReplaceView("source", "id INT, ts TIMESTAMP",
+          "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n" +
+          "{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "2001-01-01 00:00:00"), // updated
+          row(2, "2001-01-02 00:00:00")  // new
+      );
+      assertEquals("Should have expected rows",
+          expectedRows,
+          sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithBucketTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithTruncateTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeIntoPartitionedAndOrderedTable() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+      sql("ALTER TABLE %s WRITE ORDERED BY (id)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testSelfMerge() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSelfMergeWithCaching() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("CACHE TABLE %s", tableName);
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceAsSelfSubquery() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+    sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+        "ON t.id == s.value " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+      Assert.fail("Expected a validation exception");
+    } catch (ExecutionException e) {
+      Throwable sparkException = e.getCause();
+      Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+      Throwable validationException = sparkException.getCause();
+      Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+      String errMsg = validationException.getMessage();
+      Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public void testMergeWithExtraColumnsInSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+        "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // new
+        row(2, "v2"),   // kept
+        row(3, "v3"),   // new
+        row(4, "v4")    // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullsInTargetAndSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullSafeEquals() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id <=> source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1_1"), // updated
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullCondition() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id AND NULL " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(2, "v2_2")     // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullActionConditions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }\n" +
+        "{ \"id\": 3, \"v\": \"v3_3\" }");
+
+    // all conditions are NULL and will never match any rows
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows1 = ImmutableList.of(
+        row(1, "v1"), // kept
+        row(2, "v2")  // kept
+    );
+    assertEquals("Output should match", expectedRows1, sql("SELECT * FROM %s ORDER BY v", tableName));
+
+    // only the update and insert conditions are NULL
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows2 = ImmutableList.of(
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleMatchingActions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    // the order of match actions is important in this case
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // updated (also matches the delete cond but update is first)
+        row(2, "v2")    // kept (matches neither the update nor the delete cond)
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
+    Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
+
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
+
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    List<Integer> ids = Lists.newArrayListWithCapacity(200);
+    for (int id = 1; id <= 200; id++) {
+      ids.add(id);
+    }
+    Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
+        .withColumnRenamed("value", "id")
+        .withColumn("dep", lit("hr"));
+    df.coalesce(1).writeTo(tableName).append();
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+
+    // update a record from one of two row groups and copy over the second one
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.value " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET dep = 'x'", tableName);
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+  }
+
+  @Test
+  public void testMergeInsertOnly() {
+    createAndInitTable("id STRING, v STRING",
+        "{ \"id\": \"a\", \"v\": \"v1\" }\n" +
+        "{ \"id\": \"b\", \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": \"a\", \"v\": \"v1_1\" }\n" +
+        "{ \"id\": \"a\", \"v\": \"v1_2\" }\n" +
+        "{ \"id\": \"c\", \"v\": \"v3\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_1\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row("a", "v1"),   // kept
+        row("b", "v2"),   // kept
+        row("c", "v3"),   // new
+        row("d", "v4_1"), // new
+        row("d", "v4_2")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeInsertOnlyWithCondition() {
+    createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 21, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 22, \"is_new\": false }");
+
+    // validate assignments are reordered to match the table attrs
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND is_new = TRUE THEN " +
+        "  INSERT (v, id) VALUES (s.v + 100, s.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1),  // kept
+        row(2, 121) // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET b = c2, a = c1, t.id = source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, a, id) VALUES (c2, c1, id)", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeMixedCaseAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView(
+        "source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.iD == source.Id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET B = c2, A = c1, t.Id = source.ID " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, A, iD) VALUES (c2, c1, id)", tableName);
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1")),
+        sql("SELECT * FROM %s WHERE id = 1 ORDER BY id", tableName));
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeUpdatesNestedStructFields() {
+    createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>",
+        "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2 }");
+
+    // update primitive, array, map columns inside a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = source.c1, t.s.c2.a = array(-1, -2), t.s.c2.m = map('k', 'v')", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-2, row(ImmutableList.of(-1, -2), ImmutableMap.of("k", "v"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // set primitive, array, map columns to NULL (proper casts should be in place)
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = NULL, t.s.c2 = NULL", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(null, null))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // update all fields in a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = named_struct('c1', 100, 'c2', named_struct('a', array(1), 'm', map('x', 'y')))", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(100, row(ImmutableList.of(1), ImmutableMap.of("x", "y"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithInferredCasts() {
+    createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}");
+
+    // -2 in source should be casted to "-2" in target
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = source.c1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, "-2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeModifiesNullStruct() {
+    createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
+    createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }");
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.n1 = s.n1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-10, null))),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testMergeRefreshesRelationCache() {
+    createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
+
+    Dataset<Row> query = spark.sql("SELECT name FROM " + tableName);
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n1")),
+        sql("SELECT * FROM tmp"));
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.name = s.name", tableName);
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n2")),
+        sql("SELECT * FROM tmp"));
+
+    spark.sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testMergeWithMultipleNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2"),  // new
+        row(3, "emp-id-3")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleConditionalNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeResolvesColumnsByName() {
+    createAndInitTable("id INT, badge INT, dep STRING",
+        "{ \"id\": 1, \"badge\": 1000, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"badge\": 6000, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "badge INT, id INT, dep STRING",
+        "{ \"badge\": 1001, \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"badge\": 6006, \"id\": 6, \"dep\": \"emp-id-6\" }\n" +
+        "{ \"badge\": 7007, \"id\": 7, \"dep\": \"emp-id-7\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT * ", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1001, "emp-id-1"), // updated
+        row(6, 6006, "emp-id-6"), // updated
+        row(7, 7007, "emp-id-7")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows,
+        sql("SELECT id, badge, dep FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() {
+    // ensures that MERGE INTO will resolve into the correct action even if no columns
+    // or otherwise unresolved expressions exist in the query (testing SPARK-34962)
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON 1 != 1 " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNonExistingColumns() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve t.invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid nested column",
+        AnalysisException.class, "No such struct field invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, invalid_col) VALUES (s.c1, null)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidColumnsInInsert() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the nested column",
+        AnalysisException.class, "Nested fields are not supported inside INSERT clauses",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, c.n2) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about duplicate columns",
+        AnalysisException.class, "Duplicate column names inside INSERT clause",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, id) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about missing columns",
+        AnalysisException.class, "must provide values for all columns of the target table",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id) VALUES (s.c1)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidUpdates() {
+    createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about updating an array column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.a.c1 = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about updating a map column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.m.key = 'new_key'", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithConflictingUpdates() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a top-level column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict for these columns",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", tableName);
+        });
+  }
+
+  // revisit
+  @Ignore
+  @Test
+  public void testMergeWithInvalidAssignments() {
+    createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL");
+    createOrReplaceView(
+        "source",
+        "c1 INT, c2 STRUCT<n1:INT NOT NULL> NOT NULL, c3 STRING NOT NULL, c4 STRUCT<dn2:INT,dn1:INT>",
+        "{ \"c1\": -100, \"c2\": { \"n1\" : 1 }, \"c3\" : 'str', \"c4\": { \"dn2\": 1, \"dn2\": 2 } }");
+
+    for (String policy : new String[]{"ansi", "strict"}) {
+      withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy), () -> {
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a top-level column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.id = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a nested column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing missing fields in structs",
+            AnalysisException.class, "missing fields",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s = s.c2", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing invalid data types",
+            AnalysisException.class, "Cannot safely cast",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = s.c3", tableName);
+            });

Review Comment:
   These test are failing as the Analysis Exception is happening due to 
   https://github.com/apache/iceberg/blob/13db47e2755d8a885043a7ec62eee2dcab43a9bc/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala#L174-L176
   
   On debugging found that `s.c2` despite being `not null` by table definition has `nullable=true` when assignments are resolved. 
   
   This was happening as of spark 3.3 , it introduced in https://github.com/apache/spark/commit/6689b97ec76abe5bab27f02869f8f16b32530d1a, now spark by default will not respect nullability in schema provided via 
   
   ```
   `DataFrameReader.schema(schema).json(jsonDataset)`, `DataFrameReader.schema(schema).csv(csvDataset)` 
   ```
   
   added this conf true in https://github.com/apache/iceberg/blob/master/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
   
   as we use this in our test utils here we are using it to create our temp view



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#discussion_r900040404


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestDeleteFrom extends SparkCatalogTestBase {
+  public TestDeleteFrom(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  // revisit
+  @Ignore
+  @Test
+  public void testDeleteFromUnpartitionedTable() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    AssertHelpers.assertThrows("Should not delete when not all rows of a file match the filter",
+        AnalysisException.class, "Cannot delete from",
+        () -> sql("DELETE FROM %s WHERE id < 2", tableName));
+
+    sql("DELETE FROM %s WHERE id < 4", tableName);
+
+    assertEquals("Should have no rows after successful delete",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDeleteFromTableAtSnapshot() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+
+    long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    String prefix = "snapshot_id_";
+    AssertHelpers.assertThrows("Should not be able to delete from a table at a specific snapshot",
+        IllegalArgumentException.class, "Cannot delete from table at a specific snapshot",
+        () -> sql("DELETE FROM %s.%s WHERE id < 4", tableName, prefix + snapshotId));
+  }
+
+  // revisit
+  @Ignore
+  @Test

Review Comment:
   These test's were failing stating  : 
   ```
   Field _file not found in source schema
   java.lang.IllegalArgumentException: Field _file not found in source schema
   	at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:104)
   	at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
          ....
          	at org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:192)
   	at org.apache.iceberg.spark.source.SparkWriteBuilder.validateOrMergeWriteSchema(SparkWriteBuilder.java:247)
   ```
   
   `_file` not found this was happening because in the upstream we were relying on `_ _metadata_col` flag in metadata attribute to filter these out : 
   https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L221-L225
   
   But when we were doing spark schema conversion we were not adding this in attribute metadata, hence when it was trying validating with source schema was failing. 
   
   Added a fix for same, (added for [3.2](https://github.com/apache/iceberg/pull/5075) as well) and adapted the UT to respect the upstream changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1157600488

   makes sense, thanks @pan3793, will do it in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1168747201

   updated this pr also with 3.3 only changes just in case we decide to merge 3.3 only first. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#discussion_r899896356


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -0,0 +1,1816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.spark.sql.functions.lit;
+
+public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
+
+  public TestMerge(String catalogName, String implementation, Map<String, String> config,
+                   String fileFormat, boolean vectorized, String distributionMode) {
+    super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
+  }
+
+  @BeforeClass
+  public static void setupSparkConf() {
+    spark.conf().set("spark.sql.shuffle.partitions", "4");
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS source");
+  }
+
+  @Test
+  public void testMergeWithStaticPredicatePushDown() {
+    createAndInitTable("id BIGINT, dep STRING");
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    // add a data file to the 'software' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"software\" }");
+
+    // add a data file to the 'hr' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snapshot = table.currentSnapshot();
+    String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
+    Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"dep\": \"finance\" }\n" +
+        "{ \"id\": 2, \"dep\": \"hardware\" }");
+
+    // remove the data file from the 'hr' partition to ensure it is not scanned
+    withUnavailableFiles(snapshot.addedFiles(table.io()), () -> {
+      // disable dynamic pruning and rely only on static predicate pushdown
+      withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
+        sql("MERGE INTO %s t USING source " +
+            "ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " +
+            "WHEN MATCHED AND source.id = 1 THEN " +
+            "  UPDATE SET dep = source.dep " +
+            "WHEN NOT MATCHED THEN " +
+            "  INSERT (dep, id) VALUES (source.dep, source.id)", tableName);
+      });
+    });
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1L, "finance"),  // updated
+        row(1L, "hr"),       // kept
+        row(2L, "hardware")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND (s.id >=2) THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-six\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"),   // updated
+        row(6, "emp-id-six")  // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-one") // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCauses() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCausesWithExplicitColumnSpecification() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET t.id = s.id, t.dep = s.dep " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT (t.id, t.dep) VALUES (s.id, s.dep)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceCTE() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 5, \"dep\": \"emp-id-6\" }");
+
+    sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) " +
+        "MERGE INTO %s AS t USING cte1 AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 2 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 3 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // updated
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceFromSetOps() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String derivedSource =
+        "SELECT * FROM source WHERE id = 2 " +
+        "UNION ALL " +
+        "SELECT * FROM source WHERE id = 1 OR id = 6";
+
+    sql("MERGE INTO %s AS t USING (%s) AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName, derivedSource);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.value = 2 THEN " +
+              "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceEnabledHashShuffleJoin() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id == s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    withSQLConf(ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"), () -> {
+      String errorMsg = "a single row from the target table with multiple rows of the source table";
+      AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+          SparkException.class, errorMsg,
+          () -> {
+            sql("MERGE INTO %s AS t USING source AS s " +
+                "ON t.id > s.value " +
+                "WHEN MATCHED AND t.id = 1 THEN " +
+                "  UPDATE SET id = 10 " +
+                "WHEN MATCHED AND t.id = 6 THEN " +
+                "  DELETE " +
+                "WHEN NOT MATCHED AND s.value = 2 THEN " +
+                "  INSERT (id, dep) VALUES (s.value, null)", tableName);
+          });
+    });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActionsNoEqualityCondition() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }");
+
+    List<Integer> sourceIds = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i++) {
+      sourceIds.add(i);
+    }
+    Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
+    ds.union(ds).createOrReplaceTempView("source");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id > s.value " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET id = 10 " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRow() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET * " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithUnconditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSingleConditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrowsCause("Should complain about multiple matches",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithIdentityTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithDaysTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, ts TIMESTAMP");
+      sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName, "id INT, ts TIMESTAMP",
+          "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
+
+      createOrReplaceView("source", "id INT, ts TIMESTAMP",
+          "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n" +
+          "{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "2001-01-01 00:00:00"), // updated
+          row(2, "2001-01-02 00:00:00")  // new
+      );
+      assertEquals("Should have expected rows",
+          expectedRows,
+          sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithBucketTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithTruncateTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeIntoPartitionedAndOrderedTable() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+      sql("ALTER TABLE %s WRITE ORDERED BY (id)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testSelfMerge() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSelfMergeWithCaching() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("CACHE TABLE %s", tableName);
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceAsSelfSubquery() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+    sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+        "ON t.id == s.value " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+      Assert.fail("Expected a validation exception");
+    } catch (ExecutionException e) {
+      Throwable sparkException = e.getCause();
+      Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+      Throwable validationException = sparkException.getCause();
+      Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+      String errMsg = validationException.getMessage();
+      Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public void testMergeWithExtraColumnsInSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+        "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // new
+        row(2, "v2"),   // kept
+        row(3, "v3"),   // new
+        row(4, "v4")    // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullsInTargetAndSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullSafeEquals() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id <=> source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1_1"), // updated
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullCondition() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id AND NULL " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(2, "v2_2")     // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullActionConditions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }\n" +
+        "{ \"id\": 3, \"v\": \"v3_3\" }");
+
+    // all conditions are NULL and will never match any rows
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows1 = ImmutableList.of(
+        row(1, "v1"), // kept
+        row(2, "v2")  // kept
+    );
+    assertEquals("Output should match", expectedRows1, sql("SELECT * FROM %s ORDER BY v", tableName));
+
+    // only the update and insert conditions are NULL
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows2 = ImmutableList.of(
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleMatchingActions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    // the order of match actions is important in this case
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // updated (also matches the delete cond but update is first)
+        row(2, "v2")    // kept (matches neither the update nor the delete cond)
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
+    Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
+
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
+
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    List<Integer> ids = Lists.newArrayListWithCapacity(200);
+    for (int id = 1; id <= 200; id++) {
+      ids.add(id);
+    }
+    Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
+        .withColumnRenamed("value", "id")
+        .withColumn("dep", lit("hr"));
+    df.coalesce(1).writeTo(tableName).append();
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+
+    // update a record from one of two row groups and copy over the second one
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.value " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET dep = 'x'", tableName);
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+  }
+
+  @Test
+  public void testMergeInsertOnly() {
+    createAndInitTable("id STRING, v STRING",
+        "{ \"id\": \"a\", \"v\": \"v1\" }\n" +
+        "{ \"id\": \"b\", \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": \"a\", \"v\": \"v1_1\" }\n" +
+        "{ \"id\": \"a\", \"v\": \"v1_2\" }\n" +
+        "{ \"id\": \"c\", \"v\": \"v3\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_1\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row("a", "v1"),   // kept
+        row("b", "v2"),   // kept
+        row("c", "v3"),   // new
+        row("d", "v4_1"), // new
+        row("d", "v4_2")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeInsertOnlyWithCondition() {
+    createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 21, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 22, \"is_new\": false }");
+
+    // validate assignments are reordered to match the table attrs
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND is_new = TRUE THEN " +
+        "  INSERT (v, id) VALUES (s.v + 100, s.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1),  // kept
+        row(2, 121) // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET b = c2, a = c1, t.id = source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, a, id) VALUES (c2, c1, id)", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeMixedCaseAlignsUpdateAndInsertActions() {
+    createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
+    createOrReplaceView(
+        "source",
+        "{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
+        "{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.iD == source.Id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET B = c2, A = c1, t.Id = source.ID " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (b, A, iD) VALUES (c2, c1, id)", tableName);
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(1, -2, "new_str_1")),
+        sql("SELECT * FROM %s WHERE id = 1 ORDER BY id", tableName));
+    assertEquals(
+        "Output should match",
+        ImmutableList.of(row(2, -20, "new_str_2")),
+        sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeUpdatesNestedStructFields() {
+    createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>",
+        "{ \"id\": 1, \"s\": { \"c1\": 2, \"c2\": { \"a\": [1,2], \"m\": { \"a\": \"b\"} } } } }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2 }");
+
+    // update primitive, array, map columns inside a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = source.c1, t.s.c2.a = array(-1, -2), t.s.c2.m = map('k', 'v')", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-2, row(ImmutableList.of(-1, -2), ImmutableMap.of("k", "v"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // set primitive, array, map columns to NULL (proper casts should be in place)
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.c1 = NULL, t.s.c2 = NULL", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(null, null))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    // update all fields in a struct
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = named_struct('c1', 100, 'c2', named_struct('a', array(1), 'm', map('x', 'y')))", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(100, row(ImmutableList.of(1), ImmutableMap.of("x", "y"))))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithInferredCasts() {
+    createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}");
+
+    // -2 in source should be casted to "-2" in target
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s = source.c1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, "-2")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeModifiesNullStruct() {
+    createAndInitTable("id INT, s STRUCT<n1:INT,n2:INT>", "{ \"id\": 1, \"s\": null }");
+    createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }");
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.s.n1 = s.n1", tableName);
+
+    assertEquals("Output should match",
+        ImmutableList.of(row(1, row(-10, null))),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testMergeRefreshesRelationCache() {
+    createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
+
+    Dataset<Row> query = spark.sql("SELECT name FROM " + tableName);
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n1")),
+        sql("SELECT * FROM tmp"));
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.name = s.name", tableName);
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n2")),
+        sql("SELECT * FROM tmp"));
+
+    spark.sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testMergeWithMultipleNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2"),  // new
+        row(3, "emp-id-3")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleConditionalNotMatchedActions() {
+    createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND s.id = 1 THEN " +
+        "  INSERT (dep, id) VALUES (s.dep, -1)" +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(-1, "emp-id-1"), // new
+        row(0, "emp-id-0"),  // kept
+        row(2, "emp-id-2")   // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeResolvesColumnsByName() {
+    createAndInitTable("id INT, badge INT, dep STRING",
+        "{ \"id\": 1, \"badge\": 1000, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"badge\": 6000, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "badge INT, id INT, dep STRING",
+        "{ \"badge\": 1001, \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"badge\": 6006, \"id\": 6, \"dep\": \"emp-id-6\" }\n" +
+        "{ \"badge\": 7007, \"id\": 7, \"dep\": \"emp-id-7\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT * ", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, 1001, "emp-id-1"), // updated
+        row(6, 6006, "emp-id-6"), // updated
+        row(7, 7007, "emp-id-7")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows,
+        sql("SELECT id, badge, dep FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() {
+    // ensures that MERGE INTO will resolve into the correct action even if no columns
+    // or otherwise unresolved expressions exist in the query (testing SPARK-34962)
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON 1 != 1 " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNonExistingColumns() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve t.invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid nested column",
+        AnalysisException.class, "No such struct field invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.invalid_col = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about the invalid top-level column",
+        AnalysisException.class, "cannot resolve invalid_col",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, invalid_col) VALUES (s.c1, null)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidColumnsInInsert() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about the nested column",
+        AnalysisException.class, "Nested fields are not supported inside INSERT clauses",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, c.n2) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about duplicate columns",
+        AnalysisException.class, "Duplicate column names inside INSERT clause",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n2.dn1 = s.c2 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id, id) VALUES (s.c1, null)", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about missing columns",
+        AnalysisException.class, "must provide values for all columns of the target table",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN NOT MATCHED THEN " +
+              "  INSERT (id) VALUES (s.c1)", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithInvalidUpdates() {
+    createAndInitTable("id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about updating an array column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.a.c1 = s.c2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about updating a map column",
+        AnalysisException.class, "Updating nested fields is only supported for structs",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.m.key = 'new_key'", tableName);
+        });
+  }
+
+  @Test
+  public void testMergeWithConflictingUpdates() {
+    createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a top-level column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict for these columns",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", tableName);
+        });
+
+    AssertHelpers.assertThrows("Should complain about conflicting updates to a nested column",
+        AnalysisException.class, "Updates are in conflict",
+        () -> {
+          sql("MERGE INTO %s t USING source s " +
+              "ON t.id == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', named_struct('dn1', 1, 'dn2', 2))", tableName);
+        });
+  }
+
+  // revisit
+  @Ignore
+  @Test
+  public void testMergeWithInvalidAssignments() {
+    createAndInitTable("id INT NOT NULL, s STRUCT<n1:INT NOT NULL,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL");
+    createOrReplaceView(
+        "source",
+        "c1 INT, c2 STRUCT<n1:INT NOT NULL> NOT NULL, c3 STRING NOT NULL, c4 STRUCT<dn2:INT,dn1:INT>",
+        "{ \"c1\": -100, \"c2\": { \"n1\" : 1 }, \"c3\" : 'str', \"c4\": { \"dn2\": 1, \"dn2\": 2 } }");
+
+    for (String policy : new String[]{"ansi", "strict"}) {
+      withSQLConf(ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy), () -> {
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a top-level column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.id = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing nulls to a nested column",
+            AnalysisException.class, "Cannot write nullable values to non-null column",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = NULL", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing missing fields in structs",
+            AnalysisException.class, "missing fields",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s = s.c2", tableName);
+            });
+
+        AssertHelpers.assertThrows("Should complain about writing invalid data types",
+            AnalysisException.class, "Cannot safely cast",
+            () -> {
+              sql("MERGE INTO %s t USING source s " +
+                  "ON t.id == s.c1 " +
+                  "WHEN MATCHED THEN " +
+                  "  UPDATE SET t.s.n1 = s.c3", tableName);
+            });

Review Comment:
   These test were failing as the Analysis Exception is happening due to 
   https://github.com/apache/iceberg/blob/13db47e2755d8a885043a7ec62eee2dcab43a9bc/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala#L174-L176
   
   On debugging found that `s.c2` despite being `not null` by schema definition has `nullable=true` when assignments are resolved. 
   
   This was happening as of spark 3.3 , due to a behaviour introduced in https://github.com/apache/spark/commit/6689b97ec76abe5bab27f02869f8f16b32530d1a, now spark by default will not respect nullability in schema provided via 
   
   ```
   `DataFrameReader.schema(schema).json(jsonDataset)`, `DataFrameReader.schema(schema).csv(csvDataset)` 
   ```
   
   added this conf true in https://github.com/apache/iceberg/blob/master/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
   
   as we use this in our test utils here we are using it to create our temp view



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #5056: Spark: Support Spark 3.3

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #5056:
URL: https://github.com/apache/iceberg/pull/5056#issuecomment-1160446745

   > "copy 3.2 files from 3.3" should be in another PR, otherwise after squash, the git will not keep commit history.
   
   AC, opened a new [PR](https://github.com/apache/iceberg/pull/5094) for "copy 3.2 files from 3.3" 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org