You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:12 UTC

[hudi] 04/11: Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 23b0dfc337737e2d1e5b792f999c311df62f9498
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Apr 18 08:08:33 2022 -0400

    Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)
---
 .../functional/TestHoodieDeltaStreamer.java        | 25 ++++++++++++++--------
 1 file changed, 16 insertions(+), 9 deletions(-)

diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 2db72cbd41..804676f0ff 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -104,6 +104,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
@@ -123,6 +124,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -380,7 +382,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
             ret = false;
           }
         }
-        return true;
+        return ret;
       });
       res.get(timeoutInSecs, TimeUnit.SECONDS);
     }
@@ -1028,17 +1030,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     });
   }
 
-  @Disabled("HUDI-3710 to fix the ConcurrentModificationException")
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
     String tableBasePath = dfsBasePath + "/asyncClusteringJob";
-
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
+    CountDownLatch countDownLatch = new CountDownLatch(1);
 
     deltaStreamerTestRunner(ds, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      countDownLatch.countDown();
+      return true;
+    });
 
+    if (countDownLatch.await(2, TimeUnit.MINUTES)) {
       Option<String> scheduleClusteringInstantTime = Option.empty();
       try {
         HoodieClusteringJob scheduleClusteringJob =
@@ -1046,7 +1051,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
         scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
       } catch (Exception e) {
         LOG.warn("Schedule clustering failed", e);
-        return false;
+        Assertions.fail("Schedule clustering failed", e);
       }
       if (scheduleClusteringInstantTime.isPresent()) {
         LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
@@ -1054,13 +1059,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
             shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false);
         HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
         clusterClusteringJob.cluster(clusterClusteringConfig.retry);
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
         LOG.info("Cluster success");
       } else {
-        LOG.warn("Schedule clustering failed");
+        LOG.warn("Clustering execution failed");
+        Assertions.fail("Clustering execution failed");
       }
-      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
-      return true;
-    });
+    } else {
+      Assertions.fail("Deltastreamer should have completed 2 commits.");
+    }
   }
 
   @Test