You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:12 UTC
[hudi] 04/11: Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 23b0dfc337737e2d1e5b792f999c311df62f9498
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Apr 18 08:08:33 2022 -0400
Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)
---
.../functional/TestHoodieDeltaStreamer.java | 25 ++++++++++++++--------
1 file changed, 16 insertions(+), 9 deletions(-)
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 2db72cbd41..804676f0ff 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -104,6 +104,7 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
@@ -123,6 +124,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -380,7 +382,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
ret = false;
}
}
- return true;
+ return ret;
});
res.get(timeoutInSecs, TimeUnit.SECONDS);
}
@@ -1028,17 +1030,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
});
}
- @Disabled("HUDI-3710 to fix the ConcurrentModificationException")
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringJob";
-
- HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
+ HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
+ CountDownLatch countDownLatch = new CountDownLatch(1);
deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ countDownLatch.countDown();
+ return true;
+ });
+ if (countDownLatch.await(2, TimeUnit.MINUTES)) {
Option<String> scheduleClusteringInstantTime = Option.empty();
try {
HoodieClusteringJob scheduleClusteringJob =
@@ -1046,7 +1051,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
} catch (Exception e) {
LOG.warn("Schedule clustering failed", e);
- return false;
+ Assertions.fail("Schedule clustering failed", e);
}
if (scheduleClusteringInstantTime.isPresent()) {
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
@@ -1054,13 +1059,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
LOG.info("Cluster success");
} else {
- LOG.warn("Schedule clustering failed");
+ LOG.warn("Clustering execution failed");
+ Assertions.fail("Clustering execution failed");
}
- TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
- return true;
- });
+ } else {
+ Assertions.fail("Deltastreamer should have completed 2 commits.");
+ }
}
@Test