You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/08 18:04:57 UTC
[iceberg] branch master updated: Spark: Consolidate MERGE tests
(#2222)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 23df220 Spark: Consolidate MERGE tests (#2222)
23df220 is described below
commit 23df22020b7d095e2cc9593d4d05af2bdcb89ba4
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Feb 8 10:04:44 2021 -0800
Spark: Consolidate MERGE tests (#2222)
---
.../spark/extensions/TestIcebergExpressions.java | 70 +++
.../apache/iceberg/spark/extensions/TestMerge.java | 501 ++++++++++++++++++++
.../spark/extensions/TestMergeIntoTable.java | 526 ---------------------
3 files changed, 571 insertions(+), 526 deletions(-)
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
new file mode 100644
index 0000000..ce88814
--- /dev/null
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.math.BigDecimal;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestIcebergExpressions extends SparkExtensionsTestBase {
+
+ public TestIcebergExpressions(String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP VIEW IF EXISTS emp");
+ sql("DROP VIEW IF EXISTS v");
+ }
+
+ @Test
+ public void testTruncateExpressions() {
+ sql("CREATE TABLE %s ( " +
+ " int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING, binary_c BINARY " +
+ ") USING iceberg", tableName);
+
+ sql("CREATE TEMPORARY VIEW emp " +
+ "AS SELECT * FROM VALUES (101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) " +
+ "AS EMP(int_c, long_c, dec_c, str_c, binary_c)");
+
+ sql("INSERT INTO %s SELECT * FROM emp", tableName);
+
+ Dataset<Row> df = spark.sql("SELECT * FROM " + tableName);
+ df.select(
+ new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
+ new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
+ new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
+ new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
+ new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c")
+ ).createOrReplaceTempView("v");
+
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
+ sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v"));
+ }
+}
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index ee68048..6c370e6 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -49,9 +50,11 @@ import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.spark.sql.functions.lit;
public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
@@ -75,6 +78,504 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
// TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
@Test
+ public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
+ createAndInitTable("id INT, dep STRING");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // new
+ row(2, "emp-id-2"), // new
+ row(3, "emp-id-3") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
+ createAndInitTable("id INT, dep STRING");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN NOT MATCHED AND (s.id >=2) THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(2, "emp-id-2"), // new
+ row(3, "emp-id-3") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithOnlyUpdateClause() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-six\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(6, "emp-id-six") // kept
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithOnlyDeleteClause() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-one") // kept
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithAllCauses() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithAllCausesWithExplicitColumnSpecification() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET t.id = s.id, t.dep = s.dep " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT (t.id, t.dep) VALUES (s.id, s.dep)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithSourceCTE() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-two\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-3\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 5, \"dep\": \"emp-id-6\" }");
+
+ sql("WITH cte1 AS (SELECT id + 1 AS id, dep FROM source) " +
+ "MERGE INTO %s AS t USING cte1 AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 2 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 3 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(2, "emp-id-2"), // updated
+ row(3, "emp-id-3") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithSourceFromSetOps() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ String derivedSource =
+ "SELECT * FROM source WHERE id = 2 " +
+ "UNION ALL " +
+ "SELECT * FROM source WHERE id = 1 OR id = 6";
+
+ sql("MERGE INTO %s AS t USING (%s) AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName, derivedSource);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithMultipleUpdatesForTargetRow() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ String errorMsg = "a single row from the target table with multiple rows of the source table";
+ AssertHelpers.assertThrows("Should complain non iceberg target table",
+ SparkException.class, errorMsg,
+ () -> {
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+ });
+
+ assertEquals("Target should be unchanged",
+ ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+ sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+ }
+
+ @Test
+ public void testMergeWithDisabledCardinalityCheck() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ try {
+ // disable the cardinality check
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, MERGE_CARDINALITY_CHECK_ENABLED, false);
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+ } finally {
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, MERGE_CARDINALITY_CHECK_ENABLED, true);
+ }
+
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "emp-id-1"), row(1, "emp-id-1"), row(2, "emp-id-2")),
+ sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+ }
+
+ @Test
+ public void testMergeWithUnconditionalDelete() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithSingleConditionalDelete() {
+ createAndInitTable("id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ String errorMsg = "a single row from the target table with multiple rows of the source table";
+ AssertHelpers.assertThrows("Should complain non iceberg target table",
+ SparkException.class, errorMsg,
+ () -> {
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+ });
+
+ assertEquals("Target should be unchanged",
+ ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
+ sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
+ }
+
+ @Test
+ public void testMergeWithIdentityTransform() {
+ for (DistributionMode mode : DistributionMode.values()) {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD identity(dep)", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+ append(tableName,
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ removeTables();
+ }
+ }
+
+ @Test
+ public void testMergeWithDaysTransform() {
+ for (DistributionMode mode : DistributionMode.values()) {
+ createAndInitTable("id INT, ts TIMESTAMP");
+ sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+ append(tableName, "id INT, ts TIMESTAMP",
+ "{ \"id\": 1, \"ts\": \"2000-01-01 00:00:00\" }\n" +
+ "{ \"id\": 6, \"ts\": \"2000-01-06 00:00:00\" }");
+
+ createOrReplaceView("source", "id INT, ts TIMESTAMP",
+ "{ \"id\": 2, \"ts\": \"2001-01-02 00:00:00\" }\n" +
+ "{ \"id\": 1, \"ts\": \"2001-01-01 00:00:00\" }\n" +
+ "{ \"id\": 6, \"ts\": \"2001-01-06 00:00:00\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "2001-01-01 00:00:00"), // updated
+ row(2, "2001-01-02 00:00:00") // new
+ );
+ assertEquals("Should have expected rows",
+ expectedRows,
+ sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id", tableName));
+
+ removeTables();
+ }
+ }
+
+ @Test
+ public void testMergeWithBucketTransform() {
+ for (DistributionMode mode : DistributionMode.values()) {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD bucket(2, dep)", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+ append(tableName,
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ removeTables();
+ }
+ }
+
+ @Test
+ public void testMergeWithTruncateTransform() {
+ for (DistributionMode mode : DistributionMode.values()) {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD truncate(dep, 2)", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+ append(tableName,
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ removeTables();
+ }
+ }
+
+ @Test
+ public void testMergeIntoPartitionedAndOrderedTable() {
+ for (DistributionMode mode : DistributionMode.values()) {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+ sql("ALTER TABLE %s WRITE ORDERED BY (id)", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, WRITE_DISTRIBUTION_MODE, mode.modeName());
+
+ append(tableName,
+ "{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ createOrReplaceView("source", "id INT, dep STRING",
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" +
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" +
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
+
+ sql("MERGE INTO %s AS t USING source AS s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET * " +
+ "WHEN MATCHED AND t.id = 6 THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND s.id = 2 THEN " +
+ " INSERT *", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "emp-id-1"), // updated
+ row(2, "emp-id-2") // new
+ );
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ removeTables();
+ }
+ }
+
+ @Test
public void testSelfMerge() {
createAndInitTable("id INT, v STRING",
"{ \"id\": 1, \"v\": \"v1\" }\n" +
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
deleted file mode 100644
index 360d6c0..0000000
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.extensions;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.spark.SparkCatalog;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.spark.SparkException;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED;
-import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
-
-public class TestMergeIntoTable extends SparkRowLevelOperationsTestBase {
- private final String sourceName;
- private final String targetName;
- private final List<String> writeModes = new ArrayList<>(Arrays.asList("none", "hash", "range"));
-
- @Parameterized.Parameters(
- name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, vectorized = {4}")
- public static Object[][] parameters() {
- return new Object[][] {
- { "testhive", SparkCatalog.class.getName(),
- ImmutableMap.of(
- "type", "hive",
- "default-namespace", "default"
- ),
- "parquet",
- true
- },
- { "spark_catalog", SparkSessionCatalog.class.getName(),
- ImmutableMap.of(
- "type", "hive",
- "default-namespace", "default",
- "clients", "1",
- "parquet-enabled", "false",
- "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
- ),
- "parquet",
- false
- }
- };
- }
-
- public TestMergeIntoTable(String catalogName, String implementation, Map<String, String> config,
- String fileFormat, Boolean vectorized) {
- super(catalogName, implementation, config, fileFormat, vectorized);
- this.sourceName = tableName("source");
- this.targetName = tableName("target");
- }
-
- @BeforeClass
- public static void setupSparkConf() {
- spark.conf().set("spark.sql.shuffle.partitions", "4");
- }
-
- protected Map<String, String> extraTableProperties() {
- return ImmutableMap.of(TableProperties.MERGE_MODE, TableProperties.MERGE_MODE_DEFAULT);
- }
-
- @Before
- public void createTables() {
- createAndInitUnPartitionedTargetTable(targetName);
- createAndInitSourceTable(sourceName);
- }
-
- @After
- public void removeTables() {
- sql("DROP TABLE IF EXISTS %s", targetName);
- sql("DROP TABLE IF EXISTS %s", sourceName);
- }
-
- @Test
- public void testEmptyTargetInsertAllNonMatchingRows() throws NoSuchTableException {
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(2, "emp-id-2"), new Employee(3, "emp-id-3"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN NOT MATCHED THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2"), row(3, "emp-id-3")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testEmptyTargetInsertOnlyMatchingRows() throws NoSuchTableException {
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(2, "emp-id-2"), new Employee(3, "emp-id-3"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN NOT MATCHED AND (source.id >= 2) THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(2, "emp-id-2"), row(3, "emp-id-3")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testOnlyUpdate() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-six"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(6, "emp-id-six")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testOnlyDelete() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-one")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testAllCauses() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testAllCausesWithExplicitColumnSpecification() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET target.id = source.id, target.dep = source.dep " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT (target.id, target.dep) VALUES (source.id, source.dep) ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testSourceCTE() throws NoSuchTableException {
- Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
- Assume.assumeFalse(catalogName.equalsIgnoreCase("testhive"));
-
- append(targetName, new Employee(2, "emp-id-two"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-3"), new Employee(1, "emp-id-2"), new Employee(5, "emp-id-6"));
- String sourceCTE = "WITH cte1 AS (SELECT id + 1 AS id, dep FROM source)";
- String sqlText = sourceCTE + " MERGE INTO %s AS target " +
- "USING cte1" + " AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 2 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 3 THEN INSERT * ";
-
- sql(sqlText, targetName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(2, "emp-id-2"), row(3, "emp-id-3")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testSourceFromSetOps() throws NoSuchTableException {
- Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
- Assume.assumeFalse(catalogName.equalsIgnoreCase("testhive"));
-
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String derivedSource = " ( SELECT * FROM source WHERE id = 2 " +
- " UNION ALL " +
- " SELECT * FROM source WHERE id = 1 OR id = 6)";
- String sqlText = "MERGE INTO %s AS target " +
- "USING " + derivedSource + " AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testMultipleUpdatesForTargetRow() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
- new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- String errorMsg = "statement matched a single row from the target table with multiple rows of the source table";
- AssertHelpers.assertThrows("Should complain ambiguous row in target",
- SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
- assertEquals("Target should be unchanged",
- ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testIgnoreMultipleUpdatesForTargetRow() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
- new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
- // Disable count check
- sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", targetName, MERGE_CARDINALITY_CHECK_ENABLED, false);
-
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
- new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
- new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
-
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- String errorMsg = "statement matched a single row from the target table with multiple rows of the source table";
- AssertHelpers.assertThrows("Should complain ambiguous row in target",
- SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
- assertEquals("Target should be unchanged",
- ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- }
-
- @Test
- public void testIdentityPartition() {
- writeModes.forEach(mode -> {
- removeTables();
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
- initTable(targetName);
- setDistributionMode(targetName, mode);
- createAndInitSourceTable(sourceName);
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
-
- String sqlText = "MERGE INTO %s AS target " +
- "USING %s AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
- "WHEN MATCHED AND target.id = 6 THEN DELETE " +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- });
- }
-
- @Test
- public void testDaysTransform() {
- writeModes.forEach(mode -> {
- removeTables();
- sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
- initTable(targetName);
- setDistributionMode(targetName, mode);
- sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
- initTable(sourceName);
- sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
- "(6, timestamp('2001-01-06 00:00:00'))");
- sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
- "(1, timestamp('2001-01-01 00:00:00'))," +
- "(6, timestamp('2001-01-06 00:00:00'))");
-
- String sqlText = "MERGE INTO %s AS target \n" +
- "USING %s AS source \n" +
- "ON target.id = source.id \n" +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
- "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
- sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
- });
- }
-
- @Test
- public void testBucketExpression() {
- writeModes.forEach(mode -> {
- removeTables();
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
- " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
- initTable(targetName);
- setDistributionMode(targetName, mode);
- createAndInitSourceTable(sourceName);
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target \n" +
- "USING %s AS source \n" +
- "ON target.id = source.id \n" +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
- "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- });
- }
-
- @Test
- public void testTruncateExpressionInMerge() {
- writeModes.forEach(mode -> {
- removeTables();
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", targetName);
- sql("ALTER TABLE %s ADD PARTITION FIELD truncate(id, 2)", targetName);
- initTable(targetName);
- setDistributionMode(targetName, mode);
- createAndInitSourceTable(sourceName);
- append(targetName, new Employee(101, "id-101"), new Employee(601, "id-601"));
- append(sourceName, new Employee(201, "id-201"), new Employee(101, "id-101"), new Employee(601, "id-601"));
- String sqlText = "MERGE INTO %s AS target \n" +
- "USING %s AS source \n" +
- "ON target.id = source.id \n" +
- "WHEN MATCHED AND target.id = 101 THEN UPDATE SET * \n" +
- "WHEN MATCHED AND target.id = 601 THEN DELETE \n" +
- "WHEN NOT MATCHED AND source.id = 201 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(101, "id-101"), row(201, "id-201")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- });
- }
-
- @Test
- public void testTruncateExpressions() {
- removeTables();
- sql("DROP VIEW IF EXISTS EMP");
- String viewText = "CREATE TEMPORARY VIEW EMP AS SELECT * FROM VALUES " +
- "(101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) " +
- "AS EMP(int_c, long_c, dec_c, str_c, binary_c)";
- sql(viewText);
- sql("CREATE TABLE %s (int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING," +
- " binary_c BINARY) USING iceberg", targetName);
- sql("INSERT INTO %s SELECT * FROM EMP", targetName);
- Dataset df = spark.sql("SELECT * FROM " + targetName);
- df.select(new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
- new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
- new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
- new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
- new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c")
- ).createOrReplaceTempView("v1");
- spark.sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v1").show(false);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
- sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v1"));
- }
-
- @Test
- public void testPartitionedAndOrderedTable() {
- writeModes.forEach(mode -> {
- removeTables();
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
- " PARTITIONED BY (id)", targetName);
- sql("ALTER TABLE %s WRITE ORDERED BY (dep)", targetName);
- initTable(targetName);
- setDistributionMode(targetName, mode);
- createAndInitSourceTable(sourceName);
- append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
- append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
- String sqlText = "MERGE INTO %s AS target \n" +
- "USING %s AS source \n" +
- "ON target.id = source.id \n" +
- "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
- "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
- "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
-
- sql(sqlText, targetName, sourceName);
- assertEquals("Should have expected rows",
- ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
- sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
- });
- }
-
- protected void createAndInitUnPartitionedTargetTable(String tabName) {
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tabName);
- initTable(tabName);
- }
-
- protected void createAndInitSourceTable(String tabName) {
- sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (dep)", tabName);
- initTable(tabName);
- }
-
- private void initTable(String tabName) {
- sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, DEFAULT_FILE_FORMAT, fileFormat);
-
- switch (fileFormat) {
- case "parquet":
- sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tabName, PARQUET_VECTORIZATION_ENABLED, vectorized);
- break;
- case "orc":
- Assert.assertTrue(vectorized);
- break;
- case "avro":
- Assert.assertFalse(vectorized);
- break;
- }
-
- Map<String, String> props = extraTableProperties();
- props.forEach((prop, value) -> {
- sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, prop, value);
- });
- }
-
- private void setDistributionMode(String tabName, String mode) {
- sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tabName, TableProperties.WRITE_DISTRIBUTION_MODE, mode);
- }
-
- protected void append(String tabName, Employee... employees) {
- try {
- List<Employee> input = Arrays.asList(employees);
- Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
- inputDF.coalesce(1).writeTo(tabName).append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e.getMessage());
- }
- }
-}