You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/08 18:04:57 UTC

[iceberg] branch master updated: Spark: Consolidate MERGE tests (#2222)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 23df220  Spark: Consolidate MERGE tests (#2222)
23df220 is described below

commit 23df22020b7d095e2cc9593d4d05af2bdcb89ba4
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Feb 8 10:04:44 2021 -0800

    Spark: Consolidate MERGE tests (#2222)
---
 .../spark/extensions/TestIcebergExpressions.java   |  70 +++
 .../apache/iceberg/spark/extensions/TestMerge.java | 501 ++++++++++++++++++++
 .../spark/extensions/TestMergeIntoTable.java       | 526 ---------------------
 3 files changed, 571 insertions(+), 526 deletions(-)

diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
new file mode 100644
index 0000000..ce88814
--- /dev/null
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.math.BigDecimal;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestIcebergExpressions extends SparkExtensionsTestBase {
+
+  public TestIcebergExpressions(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP VIEW IF EXISTS emp");
+    sql("DROP VIEW IF EXISTS v");
+  }
+
+  @Test
+  public void testTruncateExpressions() {
+    sql("CREATE TABLE %s ( " +
+        "  int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING, binary_c BINARY " +
+        ") USING iceberg", tableName);
+
+    sql("CREATE TEMPORARY VIEW emp " +
+        "AS SELECT * FROM VALUES (101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) " +
+        "AS EMP(int_c, long_c, dec_c, str_c, binary_c)");
+
+    sql("INSERT INTO %s SELECT * FROM emp", tableName);
+
+    Dataset<Row> df = spark.sql("SELECT * FROM " + tableName);
+    df.select(
+        new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
+        new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
+        new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
+        new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
+        new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c")
+    ).createOrReplaceTempView("v");
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
+        sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v"));
+  }
+}
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index ee68048..6c370e6 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -49,9 +50,11 @@ import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED;
 import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 import static org.apache.spark.sql.functions.lit;
 
 public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
@@ -75,6 +78,504 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
   // TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
 
   @Test
+  public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // new
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
+    createAndInitTable("id INT, dep STRING");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND (s.id >=2) THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // new
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-six\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"),   // updated
+        row(6, "emp-id-six")  // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteClause() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-one") // kept
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCauses() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithAllCausesWithExplicitColumnSpecification() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET t.id = s.id, t.dep = s.dep " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT (t.id, t.dep) VALUES (s.id, s.dep)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceCTE() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 5, \"dep\": \"emp-id-6\" }");
+
+    sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) " +
+        "MERGE INTO %s AS t USING cte1 AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 2 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 3 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2"), // updated
+        row(3, "emp-id-3")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceFromSetOps() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String derivedSource =
+        "SELECT * FROM source WHERE id = 2 " +
+        "UNION ALL " +
+        "SELECT * FROM source WHERE id = 1 OR id = 6";
+
+    sql("MERGE INTO %s AS t USING (%s) AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET * " +
+        "WHEN MATCHED AND t.id = 6 THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName, derivedSource);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "emp-id-1"), // updated
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleUpdatesForTargetRow() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrows("Should complain non iceberg target table",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  UPDATE SET * " +
+              "WHEN MATCHED AND t.id = 6 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithDisabledCardinalityCheck() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+         "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+         "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+         "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    try {
+      // disable the cardinality check
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, MERGE_CARDINALITY_CHECK_ENABLED, false);
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+    } finally {
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, MERGE_CARDINALITY_CHECK_ENABLED, true);
+    }
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "emp-id-1"), row(1, "emp-id-1"), row(2, "emp-id-2")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithUnconditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    sql("MERGE INTO %s AS t USING source AS s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND s.id = 2 THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(2, "emp-id-2")  // new
+    );
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSingleConditionalDelete() {
+    createAndInitTable("id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    createOrReplaceView("source", "id INT, dep STRING",
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+        "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+        "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+    String errorMsg = "a single row from the target table with multiple rows of the source table";
+    AssertHelpers.assertThrows("Should complain non iceberg target table",
+        SparkException.class, errorMsg,
+        () -> {
+          sql("MERGE INTO %s AS t USING source AS s " +
+              "ON t.id == s.id " +
+              "WHEN MATCHED AND t.id = 1 THEN " +
+              "  DELETE " +
+              "WHEN NOT MATCHED AND s.id = 2 THEN " +
+              "  INSERT *", tableName);
+        });
+
+    assertEquals("Target should be unchanged",
+        ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+        sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+  }
+
+  @Test
+  public void testMergeWithIdentityTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithDaysTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, ts TIMESTAMP");
+      sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName, "id INT, ts TIMESTAMP",
+          "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
+
+      createOrReplaceView("source", "id INT, ts TIMESTAMP",
+          "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n" +
+          "{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n" +
+          "{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "2001-01-01 00:00:00"), // updated
+          row(2, "2001-01-02 00:00:00")  // new
+      );
+      assertEquals("Should have expected rows",
+          expectedRows,
+          sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithBucketTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeWithTruncateTransform() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
+  public void testMergeIntoPartitionedAndOrderedTable() {
+    for (DistributionMode mode : DistributionMode.values()) {
+      createAndInitTable("id INT, dep STRING");
+      sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+      sql("ALTER TABLE %s WRITE ORDERED BY (id)", tableName);
+      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+      append(tableName,
+          "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      createOrReplaceView("source", "id INT, dep STRING",
+          "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+          "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+          "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+      sql("MERGE INTO %s AS t USING source AS s " +
+          "ON t.id == s.id " +
+          "WHEN MATCHED AND t.id = 1 THEN " +
+          "  UPDATE SET * " +
+          "WHEN MATCHED AND t.id = 6 THEN " +
+          "  DELETE " +
+          "WHEN NOT MATCHED AND s.id = 2 THEN " +
+          "  INSERT *", tableName);
+
+      ImmutableList<Object[]> expectedRows = ImmutableList.of(
+          row(1, "emp-id-1"), // updated
+          row(2, "emp-id-2")  // new
+      );
+      assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      removeTables();
+    }
+  }
+
+  @Test
   public void testSelfMerge() {
     createAndInitTable("id INT, v STRING",
         "{ \"id\": 1, \"v\": \"v1\" }\n" +
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
deleted file mode 100644
index 360d6c0..0000000
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.extensions;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.spark.SparkCatalog;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.spark.SparkException;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED;
-import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
-
-public class TestMergeIntoTable extends SparkRowLevelOperationsTestBase {
-  private final String sourceName;
-  private final String targetName;
-  private final List<String> writeModes = new ArrayList<>(Arrays.asList("none", "hash", "range"));
-
-  @Parameterized.Parameters(
-      name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, vectorized = {4}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-        { "testhive", SparkCatalog.class.getName(),
-            ImmutableMap.of(
-                "type", "hive",
-                "default-namespace", "default"
-            ),
-            "parquet",
-            true
-        },
-        { "spark_catalog", SparkSessionCatalog.class.getName(),
-            ImmutableMap.of(
-                "type", "hive",
-                "default-namespace", "default",
-                "clients", "1",
-                "parquet-enabled", "false",
-                "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
-            ),
-            "parquet",
-            false
-        }
-    };
-  }
-
-  public TestMergeIntoTable(String catalogName, String implementation, Map<String, String> config,
-                            String fileFormat, Boolean vectorized) {
-    super(catalogName, implementation, config, fileFormat, vectorized);
-    this.sourceName = tableName("source");
-    this.targetName = tableName("target");
-  }
-
-  @BeforeClass
-  public static void setupSparkConf() {
-    spark.conf().set("spark.sql.shuffle.partitions", "4");
-  }
-
-  protected Map<String, String> extraTableProperties() {
-    return ImmutableMap.of(TableProperties.MERGE_MODE, TableProperties.MERGE_MODE_DEFAULT);
-  }
-
-  @Before
-  public void createTables() {
-    createAndInitUnPartitionedTargetTable(targetName);
-    createAndInitSourceTable(sourceName);
-  }
-
-  @After
-  public void removeTables() {
-    sql("DROP TABLE IF EXISTS %s", targetName);
-    sql("DROP TABLE IF EXISTS %s", sourceName);
-  }
-
-  @Test
-  public void testEmptyTargetInsertAllNonMatchingRows() throws NoSuchTableException {
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(2, "emp-id-2"), new Employee(3, "emp-id-3"));
-    String sqlText = "MERGE INTO %s AS target " +
-                     "USING %s AS source " +
-                     "ON target.id = source.id " +
-                     "WHEN NOT MATCHED THEN INSERT * ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2"), row(3, "emp-id-3")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testEmptyTargetInsertOnlyMatchingRows() throws NoSuchTableException {
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(2, "emp-id-2"), new Employee(3, "emp-id-3"));
-    String sqlText = "MERGE INTO %s AS target " +
-                     "USING %s AS source " +
-                     "ON target.id = source.id " +
-                     "WHEN NOT MATCHED AND (source.id >= 2) THEN INSERT * ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(2, "emp-id-2"), row(3, "emp-id-3")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testOnlyUpdate() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-six"));
-    append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING %s AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-1"), row(6, "emp-id-six")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testOnlyDelete() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING %s AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-one")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testAllCauses() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-    String sqlText = "MERGE INTO %s AS target " +
-                     "USING %s AS source " +
-                     "ON target.id = source.id " +
-                     "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
-                     "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-                     "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testAllCausesWithExplicitColumnSpecification() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING %s AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 1 THEN UPDATE SET target.id = source.id, target.dep = source.dep " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-            "WHEN NOT MATCHED AND source.id = 2 THEN INSERT (target.id, target.dep) VALUES (source.id, source.dep) ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testSourceCTE() throws NoSuchTableException {
-    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
-    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhive"));
-
-    append(targetName, new Employee(2, "emp-id-two"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(2, "emp-id-3"), new Employee(1, "emp-id-2"), new Employee(5, "emp-id-6"));
-    String sourceCTE = "WITH cte1 AS (SELECT id + 1 AS id, dep FROM source)";
-    String sqlText = sourceCTE + " MERGE INTO %s AS target " +
-            "USING cte1"  + " AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 2 THEN UPDATE SET * " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-            "WHEN NOT MATCHED AND source.id = 3 THEN INSERT * ";
-
-    sql(sqlText, targetName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(2, "emp-id-2"), row(3, "emp-id-3")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testSourceFromSetOps() throws NoSuchTableException {
-    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
-    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhive"));
-
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-    String derivedSource = " ( SELECT * FROM source WHERE id = 2 " +
-                           "   UNION ALL " +
-                           "   SELECT * FROM source WHERE id = 1 OR id = 6)";
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING " + derivedSource + " AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-            "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-    sql(sqlText, targetName);
-    assertEquals("Should have expected rows",
-            ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testMultipleUpdatesForTargetRow() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING %s AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-            "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-    String errorMsg = "statement matched a single row from the target table with multiple rows of the source table";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testIgnoreMultipleUpdatesForTargetRow() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
-    // Disable count check
-    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", targetName, MERGE_CARDINALITY_CHECK_ENABLED, false);
-
-    String sqlText = "MERGE INTO %s AS target " +
-            "USING %s AS source " +
-            "ON target.id = source.id " +
-            "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
-            "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-            "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-           ImmutableList.of(row(1, "emp-id-1"), row(1, "emp-id-1"), row(2, "emp-id-2")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-    sql(sqlText, targetName, sourceName);
-    assertEquals("Should have expected rows",
-           ImmutableList.of(row(2, "emp-id-2")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-    String errorMsg = "statement matched a single row from the target table with multiple rows of the source table";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-  }
-
-  @Test
-  public void testIdentityPartition()  {
-    writeModes.forEach(mode -> {
-      removeTables();
-      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
-      initTable(targetName);
-      setDistributionMode(targetName, mode);
-      createAndInitSourceTable(sourceName);
-      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-
-      String sqlText = "MERGE INTO %s AS target " +
-              "USING %s AS source " +
-              "ON target.id = source.id " +
-              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
-              "WHEN MATCHED AND target.id = 6 THEN DELETE " +
-              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-      sql(sqlText, targetName, sourceName);
-      assertEquals("Should have expected rows",
-             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-    });
-  }
-
-  @Test
-  public void testDaysTransform() {
-    writeModes.forEach(mode -> {
-      removeTables();
-      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
-      initTable(targetName);
-      setDistributionMode(targetName, mode);
-      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
-      initTable(sourceName);
-      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
-              "(6, timestamp('2001-01-06 00:00:00'))");
-      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
-              "(1, timestamp('2001-01-01 00:00:00'))," +
-              "(6, timestamp('2001-01-06 00:00:00'))");
-
-      String sqlText = "MERGE INTO %s AS target \n" +
-              "USING %s AS source \n" +
-              "ON target.id = source.id \n" +
-              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
-              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
-              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-      sql(sqlText, targetName, sourceName);
-      assertEquals("Should have expected rows",
-             ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
-             sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
-    });
-  }
-
-  @Test
-  public void testBucketExpression() {
-    writeModes.forEach(mode -> {
-      removeTables();
-      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
-              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
-      initTable(targetName);
-      setDistributionMode(targetName, mode);
-      createAndInitSourceTable(sourceName);
-      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-      String sqlText = "MERGE INTO %s AS target \n" +
-              "USING %s AS source \n" +
-              "ON target.id = source.id \n" +
-              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
-              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
-              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-      sql(sqlText, targetName, sourceName);
-      assertEquals("Should have expected rows",
-             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-    });
-  }
-
-  @Test
-  public void testTruncateExpressionInMerge() {
-    writeModes.forEach(mode -> {
-      removeTables();
-      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", targetName);
-      sql("ALTER TABLE %s ADD PARTITION FIELD truncate(id, 2)", targetName);
-      initTable(targetName);
-      setDistributionMode(targetName, mode);
-      createAndInitSourceTable(sourceName);
-      append(targetName, new Employee(101, "id-101"), new Employee(601, "id-601"));
-      append(sourceName, new Employee(201, "id-201"), new Employee(101, "id-101"), new Employee(601, "id-601"));
-      String sqlText = "MERGE INTO %s AS target \n" +
-              "USING %s AS source \n" +
-              "ON target.id = source.id \n" +
-              "WHEN MATCHED AND target.id = 101 THEN UPDATE SET * \n" +
-              "WHEN MATCHED AND target.id = 601 THEN DELETE \n" +
-              "WHEN NOT MATCHED AND source.id = 201 THEN INSERT * ";
-
-      sql(sqlText, targetName, sourceName);
-      assertEquals("Should have expected rows",
-             ImmutableList.of(row(101, "id-101"), row(201, "id-201")),
-             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-    });
-  }
-
-  @Test
-  public void testTruncateExpressions() {
-    removeTables();
-    sql("DROP VIEW IF EXISTS EMP");
-    String viewText = "CREATE TEMPORARY VIEW EMP AS SELECT * FROM VALUES " +
-        "(101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) " +
-        "AS EMP(int_c, long_c, dec_c, str_c, binary_c)";
-    sql(viewText);
-    sql("CREATE TABLE %s (int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING," +
-        " binary_c BINARY) USING iceberg", targetName);
-    sql("INSERT INTO %s SELECT * FROM EMP", targetName);
-    Dataset df = spark.sql("SELECT * FROM " + targetName);
-    df.select(new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
-      new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
-      new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
-      new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
-      new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c")
-    ).createOrReplaceTempView("v1");
-    spark.sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v1").show(false);
-    assertEquals("Should have expected rows",
-           ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
-           sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v1"));
-  }
-
-  @Test
-  public void testPartitionedAndOrderedTable() {
-    writeModes.forEach(mode -> {
-      removeTables();
-      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
-              " PARTITIONED BY (id)", targetName);
-      sql("ALTER TABLE %s  WRITE ORDERED BY (dep)", targetName);
-      initTable(targetName);
-      setDistributionMode(targetName, mode);
-      createAndInitSourceTable(sourceName);
-      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-      String sqlText = "MERGE INTO %s AS target \n" +
-              "USING %s AS source \n" +
-              "ON target.id = source.id \n" +
-              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
-              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
-              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-      sql(sqlText, targetName, sourceName);
-      assertEquals("Should have expected rows",
-             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
-             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
-    });
-  }
-
-  protected void createAndInitUnPartitionedTargetTable(String tabName) {
-    sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tabName);
-    initTable(tabName);
-  }
-
-  protected void createAndInitSourceTable(String tabName) {
-    sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (dep)", tabName);
-    initTable(tabName);
-  }
-
-  private void initTable(String tabName) {
-    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, DEFAULT_FILE_FORMAT, fileFormat);
-
-    switch (fileFormat) {
-      case "parquet":
-        sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tabName, PARQUET_VECTORIZATION_ENABLED, vectorized);
-        break;
-      case "orc":
-        Assert.assertTrue(vectorized);
-        break;
-      case "avro":
-        Assert.assertFalse(vectorized);
-        break;
-    }
-
-    Map<String, String> props = extraTableProperties();
-    props.forEach((prop, value) -> {
-      sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, prop, value);
-    });
-  }
-
-  private void setDistributionMode(String tabName, String mode) {
-    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, TableProperties.WRITE_DISTRIBUTION_MODE, mode);
-  }
-
-  protected void append(String tabName, Employee... employees) {
-    try {
-      List<Employee> input = Arrays.asList(employees);
-      Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
-      inputDF.coalesce(1).writeTo(tabName).append();
-    } catch (NoSuchTableException e) {
-      throw new RuntimeException(e.getMessage());
-    }
-  }
-}