You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/06 02:13:29 UTC
[iceberg] branch master updated: Spark: Add more tests for MERGE
(#2213)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2aba2 Spark: Add more tests for MERGE (#2213)
1f2aba2 is described below
commit 1f2aba29c41cc0937729ddfb79012098422c2d85
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Feb 5 18:13:22 2021 -0800
Spark: Add more tests for MERGE (#2213)
---
.../apache/iceberg/spark/extensions/TestMerge.java | 379 +++++++++++++++++++++
1 file changed, 379 insertions(+)
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index d1f3374..ee68048 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -19,17 +19,41 @@
package org.apache.iceberg.spark.extensions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.hamcrest.CoreMatchers;
import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.spark.sql.functions.lit;
+
public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
public TestMerge(String catalogName, String implementation, Map<String, String> config,
@@ -51,6 +75,361 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
// TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
@Test
+ public void testSelfMerge() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ sql("MERGE INTO %s t USING %s s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET v = 'x' " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *", tableName, tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "x"), // updated
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithSourceAsSelfSubquery() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+ sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET v = 'x' " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "x"), // updated
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+ // cannot run tests with concurrency for Hadoop tables without atomic renames
+ Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+ createAndInitTable("id INT, dep STRING");
+ createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+ ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+
+ // merge thread
+ Future<?> mergeFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("MERGE INTO %s t USING source s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET dep = 'x'", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ mergeFuture.get();
+ Assert.fail("Expected a validation exception");
+ } catch (ExecutionException e) {
+ Throwable sparkException = e.getCause();
+ Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+ Throwable validationException = sparkException.getCause();
+ Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+ String errMsg = validationException.getMessage();
+ Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+ } finally {
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+ }
+
+ @Test
+ public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+ // cannot run tests with concurrency for Hadoop tables without atomic renames
+ Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+ createAndInitTable("id INT, dep STRING");
+ createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+ ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+
+ // merge thread
+ Future<?> mergeFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < 20; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("MERGE INTO %s t USING source s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET dep = 'x'", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < 20; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ mergeFuture.get();
+ } finally {
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void testMergeWithExtraColumnsInSource() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+ createOrReplaceView("source",
+ "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+ "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "v1_1"), // new
+ row(2, "v2"), // kept
+ row(3, "v3"), // new
+ row(4, "v4") // new
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithNullsInTargetAndSource() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": null, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 4, \"v\": \"v4\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(null, "v1"), // kept
+ row(null, "v1_1"), // new
+ row(2, "v2"), // kept
+ row(4, "v4") // new
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+ }
+
+ @Test
+ public void testMergeWithNullSafeEquals() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": null, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 4, \"v\": \"v4\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id <=> source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(null, "v1_1"), // updated
+ row(2, "v2"), // kept
+ row(4, "v4") // new
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+ }
+
+ @Test
+ public void testMergeWithNullCondition() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": null, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id AND NULL " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(null, "v1"), // kept
+ row(null, "v1_1"), // new
+ row(2, "v2"), // kept
+ row(2, "v2_2") // new
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+ }
+
+ @Test
+ public void testMergeWithNullActionConditions() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2_2\" }\n" +
+ "{ \"id\": 3, \"v\": \"v3_3\" }");
+
+ // all conditions are NULL and will never match any rows
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows1 = ImmutableList.of(
+ row(1, "v1"), // kept
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows1, sql("SELECT * FROM %s ORDER BY v", tableName));
+
+ // only the update and insert conditions are NULL
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows2 = ImmutableList.of(
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", tableName));
+ }
+
+ @Test
+ public void testMergeWithMultipleMatchingActions() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+ // the order of match actions is important in this case
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED AND source.id = 1 THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+ " DELETE " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "v1_1"), // updated (also matches the delete cond but update is first)
+ row(2, "v2") // kept (matches neither the update nor the delete cond)
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+ }
+
+ @Test
+ public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
+ Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
+
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
+
+ createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+ List<Integer> ids = new ArrayList<>();
+ for (int id = 1; id <= 200; id++) {
+ ids.add(id);
+ }
+ Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
+ .withColumnRenamed("value", "id")
+ .withColumn("dep", lit("hr"));
+ df.coalesce(1).writeTo(tableName).append();
+
+ Assert.assertEquals(200, spark.table(tableName).count());
+
+ // update a record from one of two row groups and copy over the second one
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.value " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET dep = 'x'", tableName);
+
+ Assert.assertEquals(200, spark.table(tableName).count());
+ }
+
+ @Test
public void testMergeInsertOnly() {
createAndInitTable("id STRING, v STRING",
"{ \"id\": \"a\", \"v\": \"v1\" }\n" +