You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/06 02:13:29 UTC

[iceberg] branch master updated: Spark: Add more tests for MERGE (#2213)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2aba2  Spark: Add more tests for MERGE (#2213)
1f2aba2 is described below

commit 1f2aba29c41cc0937729ddfb79012098422c2d85
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Feb 5 18:13:22 2021 -0800

    Spark: Add more tests for MERGE (#2213)
---
 .../apache/iceberg/spark/extensions/TestMerge.java | 379 +++++++++++++++++++++
 1 file changed, 379 insertions(+)

diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index d1f3374..ee68048 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -19,17 +19,41 @@
 
 package org.apache.iceberg.spark.extensions;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.hamcrest.CoreMatchers;
 import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.spark.sql.functions.lit;
+
 public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
 
   public TestMerge(String catalogName, String implementation, Map<String, String> config,
@@ -51,6 +75,361 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
   // TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
 
   @Test
+  public void testSelfMerge() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceAsSelfSubquery() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+    sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+        "ON t.id == s.value " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+      Assert.fail("Expected a validation exception");
+    } catch (ExecutionException e) {
+      Throwable sparkException = e.getCause();
+      Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+      Throwable validationException = sparkException.getCause();
+      Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+      String errMsg = validationException.getMessage();
+      Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public void testMergeWithExtraColumnsInSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+        "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // new
+        row(2, "v2"),   // kept
+        row(3, "v3"),   // new
+        row(4, "v4")    // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullsInTargetAndSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullSafeEquals() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id <=> source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1_1"), // updated
+        row(2, "v2"),      // kept
+        row(4, "v4")       // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullCondition() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id AND NULL " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(null, "v1"),   // kept
+        row(null, "v1_1"), // new
+        row(2, "v2"),      // kept
+        row(2, "v2_2")     // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullActionConditions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }\n" +
+        "{ \"id\": 3, \"v\": \"v3_3\" }");
+
+    // all conditions are NULL and will never match any rows
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' AND NULL THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows1 = ImmutableList.of(
+        row(1, "v1"), // kept
+        row(2, "v2")  // kept
+    );
+    assertEquals("Output should match", expectedRows1, sql("SELECT * FROM %s ORDER BY v", tableName));
+
+    // only the update and insert conditions are NULL
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 AND NULL THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED AND source.id = 3 AND NULL THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows2 = ImmutableList.of(
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleMatchingActions() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2_2\" }");
+
+    // the order of match actions is important in this case
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED AND source.id = 1 THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN MATCHED AND source.v = 'v1_1' THEN " +
+        "  DELETE " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // updated (also matches the delete cond but update is first)
+        row(2, "v2")    // kept (matches neither the update nor the delete cond)
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", tableName));
+  }
+
+  @Test
+  public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException {
+    Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
+
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
+
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    List<Integer> ids = new ArrayList<>();
+    for (int id = 1; id <= 200; id++) {
+      ids.add(id);
+    }
+    Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
+        .withColumnRenamed("value", "id")
+        .withColumn("dep", lit("hr"));
+    df.coalesce(1).writeTo(tableName).append();
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+
+    // update a record from one of two row groups and copy over the second one
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.value " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET dep = 'x'", tableName);
+
+    Assert.assertEquals(200, spark.table(tableName).count());
+  }
+
+  @Test
   public void testMergeInsertOnly() {
     createAndInitTable("id STRING, v STRING",
         "{ \"id\": \"a\", \"v\": \"v1\" }\n" +