You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/04 20:52:33 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #2213: Spark: Add more tests for MERGE

aokolnychyi opened a new pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213


   This PR brings more tests for MERGE operations in Spark. In particular, tests with self subqueries, isolation level tests, null handling, updating a single row group in a file with multiple row groups. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773759475


   @aokolnychyi Nice !!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773596909






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773596909


   cc @rdblue @dilipbiswal @RussellSpitzer @mehtaashish23 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773759475


   @aokolnychyi Nice !!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#discussion_r571340161



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -50,6 +74,361 @@ public void removeTables() {
 
   // TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
 
+  @Test
+  public void testSelfMerge() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    sql("MERGE INTO %s t USING %s s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithSourceAsSelfSubquery() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+    sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON r.id = source.value) s " +
+        "ON t.id == s.value " +
+        "WHEN MATCHED AND t.id = 1 THEN " +
+        "  UPDATE SET v = 'x' " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "x"), // updated
+        row(2, "v2") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+      Assert.fail("Expected a validation exception");
+    } catch (ExecutionException e) {
+      Throwable sparkException = e.getCause();
+      Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
+      Throwable validationException = sparkException.getCause();
+      Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
+      String errMsg = validationException.getMessage();
+      Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException {
+    // cannot run tests with concurrency for Hadoop tables without atomic renames
+    Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+    createAndInitTable("id INT, dep STRING");
+    createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "snapshot");
+
+    ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+    AtomicInteger barrier = new AtomicInteger(0);
+
+    // merge thread
+    Future<?> mergeFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("MERGE INTO %s t USING source s " +
+            "ON t.id == s.value " +
+            "WHEN MATCHED THEN " +
+            "  UPDATE SET dep = 'x'", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    // append thread
+    Future<?> appendFuture = executorService.submit(() -> {
+      for (int numOperations = 0; numOperations < 20; numOperations++) {
+        while (barrier.get() < numOperations * 2) {
+          sleep(10);
+        }
+        sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+        barrier.incrementAndGet();
+      }
+    });
+
+    try {
+      mergeFuture.get();
+    } finally {
+      appendFuture.cancel(true);
+    }
+
+    executorService.shutdown();
+    Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
+  }
+
+  @Test
+  public void testMergeWithExtraColumnsInSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": 1, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+        "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET v = source.v " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1, "v1_1"), // new
+        row(2, "v2"),   // kept
+        row(3, "v3"),   // new
+        row(4, "v4")    // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeWithNullsInTargetAndSource() {
+    createAndInitTable("id INT, v STRING",
+        "{ \"id\": null, \"v\": \"v1\" }\n" +
+        "{ \"id\": 2, \"v\": \"v2\" }");
+
+    createOrReplaceView("source",
+        "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+        "{ \"id\": 4, \"v\": \"v4\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +

Review comment:
       I didn't know you could use `==`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773680667


   This PR helped to detect an issue fixed in #2215. I'll rebase once merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773680667


   This PR helped to detect an issue fixed in #2215. I'll rebase once merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773680667


   This PR helped to detect the issue fixed in #2215. I'll rebase once merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2213: Spark: Add more tests for MERGE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#issuecomment-773730670


   The underlying issue was merged, I rebased this PR as well. It is ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org