You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/21 14:03:26 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4046: [HUDI-2527] Multi writer test with conflicting async table services

nsivabalan commented on a change in pull request #4046:
URL: https://github.com/apache/hudi/pull/4046#discussion_r753804804



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -228,78 +231,88 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
     createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
     validInstants.add("002");
     validInstants.add("003");
-    ExecutorService executors = Executors.newFixedThreadPool(2);
+
+    final int threadCount = 2;
+    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+    ExecutorService executors = Executors.newFixedThreadPool(threadCount);
     SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
     SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
     // Create upserts, schedule cleaning, schedule compaction in parallel
     Future future1 = executors.submit(() -> {
-      String newCommitTime = "004";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
-      try {
-        createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          Assertions.fail("Conflicts not handled correctly");
-        }
-        validInstants.add("004");
-      } catch (Exception e1) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          Assertions.assertTrue(e1 instanceof HoodieWriteConflictException);
-        }
+      final String newCommitTime = "004";
+      final int numRecords = 100;
+      final String commitTimeBetweenPrevAndNew = "002";
+
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        assertThrows(HoodieWriteConflictException.class, () -> {
+          createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+          countDownLatch.countDown();
+          try {
+            countDownLatch.await();

Review comment:
       should we add `countDownLatch.await()` at L284 just after all futures are complete instead of here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org