You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2024/01/04 08:01:59 UTC
(flink-kubernetes-operator) branch main updated: [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 394e5291 [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)
394e5291 is described below
commit 394e5291f260a5e9877aee45519e282ed65d580d
Author: Rui Fan <19...@gmail.com>
AuthorDate: Thu Jan 4 16:01:54 2024 +0800
[hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)
* [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition
* Update flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
Co-authored-by: Yuepeng Pan <fl...@126.com>
---------
Co-authored-by: Yuepeng Pan <fl...@126.com>
---
.../standalone/StandaloneAutoscalerExecutor.java | 3 +-
.../StandaloneAutoscalerExecutorTest.java | 50 +++++++++++++---------
2 files changed, 31 insertions(+), 22 deletions(-)
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index 03129034..286c78ca 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -133,7 +133,8 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
}
}
- private void scalingSingleJob(Context jobContext) {
+ @VisibleForTesting
+ protected void scalingSingleJob(Context jobContext) {
try {
autoScaler.scale(jobContext);
} catch (Throwable e) {
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index 54abf604..ca905071 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -50,10 +50,10 @@ class StandaloneAutoscalerExecutorTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testScaling(boolean throwExceptionWhileScale) throws Exception {
- JobAutoScalerContext<JobID> jobContext1 = createJobAutoScalerContext();
- JobAutoScalerContext<JobID> jobContext2 = createJobAutoScalerContext();
+ var jobContext1 = createJobAutoScalerContext();
+ var jobContext2 = createJobAutoScalerContext();
var jobList = List.of(jobContext1, jobContext2);
- Set<JobID> exceptionKeys =
+ var exceptionKeys =
throwExceptionWhileScale
? Set.of(jobContext1.getJobKey(), jobContext2.getJobKey())
: Set.of();
@@ -62,30 +62,38 @@ class StandaloneAutoscalerExecutorTest {
Collections.synchronizedList(new ArrayList<JobAutoScalerContext<JobID>>());
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
- final Configuration conf = new Configuration();
+ final var conf = new Configuration();
conf.set(CONTROL_LOOP_PARALLELISM, 1);
var countDownLatch = new CountDownLatch(jobList.size());
+
+ final var jobAutoScaler =
+ new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
+ @Override
+ public void scale(JobAutoScalerContext<JobID> context) {
+ actualScaleContexts.add(context);
+ if (exceptionKeys.contains(context.getJobKey())) {
+ throw new RuntimeException("Excepted exception.");
+ }
+ }
+
+ @Override
+ public void cleanup(JobID jobKey) {
+ fail("Should be called.");
+ }
+ };
+
try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
- conf,
- () -> jobList,
- eventCollector,
- new JobAutoScaler<>() {
- @Override
- public void scale(JobAutoScalerContext<JobID> context) {
- actualScaleContexts.add(context);
- countDownLatch.countDown();
- if (exceptionKeys.contains(context.getJobKey())) {
- throw new RuntimeException("Excepted exception.");
- }
- }
+ conf, () -> jobList, eventCollector, jobAutoScaler) {
+ @Override
+ protected void scalingSingleJob(JobAutoScalerContext<JobID> jobContext) {
+ super.scalingSingleJob(jobContext);
+ countDownLatch.countDown();
+ }
+ }) {
- @Override
- public void cleanup(JobID jobKey) {
- fail("Should be called.");
- }
- })) {
autoscalerExecutor.scaling();
+ // Wait for all scalings to go finished.
countDownLatch.await();
assertThat(actualScaleContexts).isEqualTo(jobList);