You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2024/01/04 08:01:59 UTC

(flink-kubernetes-operator) branch main updated: [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 394e5291 [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)
394e5291 is described below

commit 394e5291f260a5e9877aee45519e282ed65d580d
Author: Rui Fan <19...@gmail.com>
AuthorDate: Thu Jan 4 16:01:54 2024 +0800

    [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition (#746)
    
    * [hotfix][autoscaler] Fix the StandaloneAutoscalerExecutorTest.testScaling fails occasionally due to race condition
    
    * Update flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
    
    Co-authored-by: Yuepeng Pan <fl...@126.com>
    
    ---------
    
    Co-authored-by: Yuepeng Pan <fl...@126.com>
---
 .../standalone/StandaloneAutoscalerExecutor.java   |  3 +-
 .../StandaloneAutoscalerExecutorTest.java          | 50 +++++++++++++---------
 2 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index 03129034..286c78ca 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -133,7 +133,8 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
         }
     }
 
-    private void scalingSingleJob(Context jobContext) {
+    @VisibleForTesting
+    protected void scalingSingleJob(Context jobContext) {
         try {
             autoScaler.scale(jobContext);
         } catch (Throwable e) {
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index 54abf604..ca905071 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -50,10 +50,10 @@ class StandaloneAutoscalerExecutorTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testScaling(boolean throwExceptionWhileScale) throws Exception {
-        JobAutoScalerContext<JobID> jobContext1 = createJobAutoScalerContext();
-        JobAutoScalerContext<JobID> jobContext2 = createJobAutoScalerContext();
+        var jobContext1 = createJobAutoScalerContext();
+        var jobContext2 = createJobAutoScalerContext();
         var jobList = List.of(jobContext1, jobContext2);
-        Set<JobID> exceptionKeys =
+        var exceptionKeys =
                 throwExceptionWhileScale
                         ? Set.of(jobContext1.getJobKey(), jobContext2.getJobKey())
                         : Set.of();
@@ -62,30 +62,38 @@ class StandaloneAutoscalerExecutorTest {
                 Collections.synchronizedList(new ArrayList<JobAutoScalerContext<JobID>>());
 
         var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
-        final Configuration conf = new Configuration();
+        final var conf = new Configuration();
         conf.set(CONTROL_LOOP_PARALLELISM, 1);
         var countDownLatch = new CountDownLatch(jobList.size());
+
+        final var jobAutoScaler =
+                new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
+                    @Override
+                    public void scale(JobAutoScalerContext<JobID> context) {
+                        actualScaleContexts.add(context);
+                        if (exceptionKeys.contains(context.getJobKey())) {
+                            throw new RuntimeException("Excepted exception.");
+                        }
+                    }
+
+                    @Override
+                    public void cleanup(JobID jobKey) {
+                        fail("Should be called.");
+                    }
+                };
+
         try (var autoscalerExecutor =
                 new StandaloneAutoscalerExecutor<>(
-                        conf,
-                        () -> jobList,
-                        eventCollector,
-                        new JobAutoScaler<>() {
-                            @Override
-                            public void scale(JobAutoScalerContext<JobID> context) {
-                                actualScaleContexts.add(context);
-                                countDownLatch.countDown();
-                                if (exceptionKeys.contains(context.getJobKey())) {
-                                    throw new RuntimeException("Excepted exception.");
-                                }
-                            }
+                        conf, () -> jobList, eventCollector, jobAutoScaler) {
+                    @Override
+                    protected void scalingSingleJob(JobAutoScalerContext<JobID> jobContext) {
+                        super.scalingSingleJob(jobContext);
+                        countDownLatch.countDown();
+                    }
+                }) {
 
-                            @Override
-                            public void cleanup(JobID jobKey) {
-                                fail("Should be called.");
-                            }
-                        })) {
             autoscalerExecutor.scaling();
+            // Wait for all scalings to go finished.
             countDownLatch.await();
 
             assertThat(actualScaleContexts).isEqualTo(jobList);