You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/04 10:44:23 UTC

[GitHub] [flink] zentol opened a new pull request, #19351: [FLINK-27045][tests] Remove shared executor

zentol opened a new pull request, #19351:
URL: https://github.com/apache/flink/pull/19351

   Sharing an executor can lead to surprising behaviors, especially since the ownership of the executor isn't well-defined.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844069571


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -132,19 +139,28 @@ public void teardown() throws Exception {
 
     @Test
     public void testCleanupWhenRestoreFails() throws Exception {
-        createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build().run();
+        createTaskBuilder()
+                .setInvokable(InvokableWithExceptionInRestore.class)
+                .build(Executors.directExecutor())

Review Comment:
   I'd rather not because we shouldn't _really_ encourage the use of direct executors. If used incorrectly (which we generally do) then the test no longer works under the same conditions as in production because the threading behavior is radically different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843115118


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -296,10 +296,10 @@ private static IntermediateResult createResult(
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
 
         SchedulerBase scheduler =
-                SchedulerTestingUtils.newSchedulerBuilder(
-                                jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                        .setIoExecutor(executorService)
-                        .setFutureExecutor(executorService)
+                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                                jobGraph,
+                                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                executorService)

Review Comment:
   direct executors services dont need to be shutdown since they arent backed by a thread pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843083347


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java:
##########
@@ -18,16 +18,19 @@
 
 package org.apache.flink.runtime.scheduler.benchmark;
 
-import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 /** Base class of all scheduler benchmarks. */
 public class SchedulerBenchmarkBase {
     public ScheduledExecutorService scheduledExecutorService;
 
     public void setup() {
-        scheduledExecutorService = TestingUtils.defaultExecutor();
+        scheduledExecutorService =

Review Comment:
   I'm not sure if that works because benchmarks aren't tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844077415


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
                             Time.milliseconds(50),
                             deadline,
                             (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
-                            TestingUtils.defaultScheduledExecutor());
+                            new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Review Comment:
   Although we couldn't put it into our test util modules.
   
   ~~Maybe we should just amend the FutureUtils to work against a plain ScheduledExecutorService~~. Meh, that wouldn't quite work since they are also used with a MainThreadExecutor.
   
   Or add variants that also accept a ScheduledExecutorService.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19351:
URL: https://github.com/apache/flink/pull/19351


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844824972


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -416,22 +410,18 @@ public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Excep
 
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
-        final JobManagerSharedServices jobManagerSharedServices =

Review Comment:
   That's a weird one. It was never actually used even when the test was added... 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java:
##########
@@ -65,29 +65,37 @@ public void testIfStartSchedulingFailsJobMasterFails() throws Exception {
         final SchedulerNGFactory schedulerFactory = new FailingSchedulerFactory();
         final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
                 new JobMasterBuilder.TestingOnCompletionActions();
-        final JobMaster jobMaster =
-                new JobMasterBuilder(
-                                JobGraphTestUtils.emptyJobGraph(),
-                                TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
-                        .withSlotPoolServiceSchedulerFactory(
-                                DefaultSlotPoolServiceSchedulerFactory.create(
-                                        TestingSlotPoolServiceBuilder.newBuilder(),
-                                        schedulerFactory))
-                        .withOnCompletionActions(onCompletionActions)
-                        .createJobMaster();
+        final JobManagerSharedServices jobManagerSharedServices =
+                new TestingJobManagerSharedServicesBuilder().build();
+        try {
+            final JobMaster jobMaster =
+                    new JobMasterBuilder(
+                                    JobGraphTestUtils.emptyJobGraph(),
+                                    TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
+                            .withSlotPoolServiceSchedulerFactory(
+                                    DefaultSlotPoolServiceSchedulerFactory.create(
+                                            TestingSlotPoolServiceBuilder.newBuilder(),
+                                            schedulerFactory))
+                            .withOnCompletionActions(onCompletionActions)
+                            .withJobManagerSharedServices(jobManagerSharedServices)
+                            .createJobMaster();
 
-        jobMaster.start();
+            jobMaster.start();
 
-        assertThat(
-                onCompletionActions.getJobMasterFailedFuture().join(),
-                is(instanceOf(JobMasterException.class)));
+            assertThat(
+                    onCompletionActions.getJobMasterFailedFuture().join(),
+                    is(instanceOf(JobMasterException.class)));
 
-        // close the jobMaster to remove it from the testing rpc service so that it can shut down
-        // cleanly
-        try {
-            jobMaster.close();
-        } catch (Exception expected) {
-            // expected
+            // close the jobMaster to remove it from the testing rpc service so that it can shut
+            // down
+            // cleanly

Review Comment:
   ```suggestion
               // down cleanly
   ```



##########
flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java:
##########
@@ -415,42 +324,6 @@ public void testOrTimeout() throws Exception {
         }
     }
 
-    @Test
-    public void testRetryWithDelayAndPredicate() throws Exception {

Review Comment:
   I couldn't find a test where we verify the error Predicate condition like we do here. Can you point me to it?



##########
flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java:
##########
@@ -316,38 +257,6 @@ public void testRetryWithDelayRetryStrategy() throws Exception {
                 completionTime >= (2 + 4 + 5 + 5));
     }
 
-    /** Tests that all scheduled tasks are canceled if the retry future is being cancelled. */
-    @Test
-    public void testRetryWithDelayCancellation() {
-        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
-                new ManuallyTriggeredScheduledExecutor();
-
-        CompletableFuture<?> retryFuture =
-                FutureUtils.retryWithDelay(
-                        () ->
-                                FutureUtils.completedExceptionally(
-                                        new FlinkException("Test exception")),
-                        1,
-                        TestingUtils.infiniteTime(),
-                        scheduledExecutor);
-
-        assertFalse(retryFuture.isDone());
-
-        final Collection<ScheduledFuture<?>> scheduledTasks =
-                scheduledExecutor.getActiveScheduledTasks();
-
-        assertFalse(scheduledTasks.isEmpty());
-
-        final ScheduledFuture<?> scheduledFuture = scheduledTasks.iterator().next();
-
-        assertFalse(scheduledFuture.isDone());

Review Comment:
   I saw `testScheduleWithDelayCancellation` covering this functionality. Therefore, removing the test is fine. I'm just wondering whether we should move this part into `testScheduleWithDelayCancellation` to have it still covered (or maybe, create a separate test that checks that the task is scheduled).



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java:
##########
@@ -555,10 +562,11 @@ public void testSavepointRescalingPartitionedOperatorState(
                             () ->
                                     client.triggerSavepoint(
                                             jobID, null, SavepointFormatType.CANONICAL),
-                            (int) deadline.timeLeft().getSeconds() / 10,
-                            Time.seconds(10),
+                            new FixedRetryStrategy(
+                                    (int) deadline.timeLeft().getSeconds() / 10,
+                                    Duration.ofSeconds(60)),

Review Comment:
   Did you change the delay on purpose from 10 to 60 seconds?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java:
##########
@@ -157,46 +160,49 @@ public void testExecutionDeploymentReconciliationForPendingExecution() throws Ex
         TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper =
                 new TestingExecutionDeploymentTrackerWrapper();
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
-        JobMaster jobMaster = createAndStartJobMaster(deploymentTrackerWrapper, jobGraph);
-        JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
-        RPC_SERVICE_RESOURCE
-                .getTestingRpcService()
-                .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
-
-        final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
-                new CompletableFuture<>();
-        final CompletableFuture<ExecutionAttemptID> taskCancellationFuture =
-                new CompletableFuture<>();
-        final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture =
-                new CompletableFuture<>();
-        TaskExecutorGateway taskExecutorGateway =
-                createTaskExecutorGateway(
-                        taskCancellationFuture,
-                        taskSubmissionFuture,
-                        taskSubmissionAcknowledgeFuture);
-        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation =
-                new LocalUnresolvedTaskManagerLocation();
-
-        registerTaskExecutorAndOfferSlots(
-                jobMasterGateway,
-                jobGraph.getJobID(),
-                taskExecutorGateway,
-                localUnresolvedTaskManagerLocation);
-
-        ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
-
-        // the execution has not been acknowledged yet by the TaskExecutor, but we already allow the
-        // ID to be in the heartbeat payload
-        jobMasterGateway.heartbeatFromTaskManager(
-                localUnresolvedTaskManagerLocation.getResourceID(),
-                new TaskExecutorToJobManagerHeartbeatPayload(
-                        new AccumulatorReport(Collections.emptyList()),
-                        new ExecutionDeploymentReport(Collections.singleton(pendingExecutionId))));
-
-        taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
-
-        deploymentTrackerWrapper.getTaskDeploymentFuture().get();
-        assertFalse(taskCancellationFuture.isDone());
+        try (JobMaster jobMaster = createAndStartJobMaster(deploymentTrackerWrapper, jobGraph)) {
+            JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+            RPC_SERVICE_RESOURCE
+                    .getTestingRpcService()
+                    .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+            final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
+                    new CompletableFuture<>();
+            final CompletableFuture<ExecutionAttemptID> taskCancellationFuture =
+                    new CompletableFuture<>();
+            final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture =
+                    new CompletableFuture<>();
+            TaskExecutorGateway taskExecutorGateway =
+                    createTaskExecutorGateway(
+                            taskCancellationFuture,
+                            taskSubmissionFuture,
+                            taskSubmissionAcknowledgeFuture);
+            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation =
+                    new LocalUnresolvedTaskManagerLocation();
+
+            registerTaskExecutorAndOfferSlots(
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway,
+                    localUnresolvedTaskManagerLocation);
+
+            ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
+
+            // the execution has not been acknowledged yet by the TaskExecutor, but we already allow
+            // the
+            // ID to be in the heartbeat payload

Review Comment:
   ```suggestion
               // the ID to be in the heartbeat payload
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843921237


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -132,19 +139,28 @@ public void teardown() throws Exception {
 
     @Test
     public void testCleanupWhenRestoreFails() throws Exception {
-        createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build().run();
+        createTaskBuilder()
+                .setInvokable(InvokableWithExceptionInRestore.class)
+                .build(Executors.directExecutor())

Review Comment:
   I'm wondering whether we should also add a `buildWithDirectExecutor()` method that uses `Executors.directExecutor` as a default.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -315,7 +334,7 @@ private void testExecutionFailsInNetworkRegistration(
                         .setPartitionProducerStateChecker(partitionProducerStateChecker)
                         .setResultPartitions(resultPartitions)
                         .setInputGates(inputGates)
-                        .build();
+                        .build(EXECUTOR_RESOURCE.getExecutor());

Review Comment:
   Could we add a comment here why we can't use the `directExecutor` here? It's still not clear to me why we need to handle this test case differently... 🤔 



##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
                             Time.milliseconds(50),
                             deadline,
                             (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
-                            TestingUtils.defaultScheduledExecutor());
+                            new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Review Comment:
   We have so many locations where we put a `ScheduledExecutorServiceAdapter`. Couldn't we move this somehow into `TestingUtils`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -78,8 +74,6 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() {
             (execution, previousState, newState) -> {};
     private VertexParallelismStore vertexParallelismStore;
 
-    private TestingDefaultExecutionGraphBuilder() {}

Review Comment:
   Why do we remove that one?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java:
##########
@@ -76,7 +59,10 @@ public void setBlobWriter(BlobWriter blobWriter) {
     }
 
     public JobManagerSharedServices build() {
+        final ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();

Review Comment:
   As far as I can see, the executor is closed in the `JobMaster.shutdown()` method which is not necessarily called in each test (e.g. `JobMasterExecutionDeploymentReconcilliationTest.testExecutionDeploymentReconciliationForPendingExecution`). Or am I missing something? 🤔 Hence, we would have an open thread pool here...



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java:
##########
@@ -188,7 +187,9 @@ public JobMaster createJobMaster() throws Exception {
                         ? slotPoolServiceSchedulerFactory
                         : DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                                 configuration, jobGraph.getJobType()),
-                jobManagerSharedServices,
+                jobManagerSharedServices != null
+                        ? jobManagerSharedServices
+                        : new TestingJobManagerSharedServicesBuilder().build(),

Review Comment:
   This is added lazily to only start the internally used threadpool if actually needed, right? (not sure whether we can actually instantiate the thread pool in the build method of `TestingJobManagerSharedServicesBuilder.build` as mentioned in my previous comment... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r845012997


##########
flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java:
##########
@@ -415,42 +324,6 @@ public void testOrTimeout() throws Exception {
         }
     }
 
-    @Test
-    public void testRetryWithDelayAndPredicate() throws Exception {

Review Comment:
   True; I migrated this test and another one to use a RetryStrategy in #19391.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843084714


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java:
##########
@@ -18,16 +18,19 @@
 
 package org.apache.flink.runtime.scheduler.benchmark;
 
-import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 /** Base class of all scheduler benchmarks. */
 public class SchedulerBenchmarkBase {
     public ScheduledExecutorService scheduledExecutorService;
 
     public void setup() {
-        scheduledExecutorService = TestingUtils.defaultExecutor();
+        scheduledExecutorService =

Review Comment:
   at least I couldn't find another benchmark that relies on junit resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843175972


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -57,6 +58,10 @@
 /** A collection of utility methods for testing the ExecutionGraph and its related classes. */
 public class ExecutionGraphTestUtils {
 
+    private static final ScheduledExecutorService executor =

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844033876


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -78,8 +74,6 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() {
             (execution, previousState, newState) -> {};
     private VertexParallelismStore vertexParallelismStore;
 
-    private TestingDefaultExecutionGraphBuilder() {}

Review Comment:
   This was an accident.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19351:
URL: https://github.com/apache/flink/pull/19351#issuecomment-1088873839

   hm, I shouldn't have approved the build, yet, considering the ci failure and that the ci is not done, yet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19351:
URL: https://github.com/apache/flink/pull/19351#issuecomment-1090127716

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844077415


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
                             Time.milliseconds(50),
                             deadline,
                             (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
-                            TestingUtils.defaultScheduledExecutor());
+                            new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Review Comment:
   Although we couldn't put it into our test util modules.
   Maybe we should just amend the FutureUtils to work against a plain ScheduledExecutorService.
   Or add variants that also accept a ScheduledExecutorService.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842959738


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java:
##########
@@ -112,9 +111,15 @@ private TaskSubmissionTestEnvironment(
                     taskManagerActionListeners,
             @Nullable String metricQueryServiceAddress,
             TestingRpcService testingRpcService,
-            ShuffleEnvironment<?, ?> shuffleEnvironment)
+            ShuffleEnvironment<?, ?> shuffleEnvironment,
+            ScheduledExecutorService executor)
             throws Exception {
 
+        this.timerService =
+                new DefaultTimerService<>(
+                        java.util.concurrent.Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   see https://github.com/apache/flink/pull/19351#discussion_r842959149



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844035571


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -315,7 +334,7 @@ private void testExecutionFailsInNetworkRegistration(
                         .setPartitionProducerStateChecker(partitionProducerStateChecker)
                         .setResultPartitions(resultPartitions)
                         .setInputGates(inputGates)
-                        .build();
+                        .build(EXECUTOR_RESOURCE.getExecutor());

Review Comment:
   🤦 I didn't see that when looking through the 10000 `createTaskBuilder` (in contrast to `new TestTaskBuilder`). Fair enought... 😇 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843010096


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -56,12 +57,23 @@
     private static final Logger LOG =
             LoggerFactory.getLogger(TestingDefaultExecutionGraphBuilder.class);
 
+    private static final ScheduledExecutorService FUTURE_EXECUTOR =
+            Executors.newScheduledThreadPool(
+                    0,
+                    new ExecutorThreadFactory(
+                            "flink-future-" + TestingDefaultExecutionGraphBuilder.class));
+
+    private static final Executor IO_EXECUTOR =
+            Executors.newCachedThreadPool(
+                    new ExecutorThreadFactory(
+                            "flink-io-" + TestingDefaultExecutionGraphBuilder.class));

Review Comment:
   My thinking was that in this case the executors aren't doing anything interesting, and since they are static we aren't leaking threads, Ultimately I just wanted to reduce the number of tests I had to touch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844016207


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java:
##########
@@ -180,7 +172,7 @@ public TaskManagerServices build() {
                 taskStateManager,
                 taskChangelogStoragesManager,
                 taskEventDispatcher,
-                ioExecutor,
+                Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   `MetricUtilsTest.testManagedMemoryMetricsInitialization` instantiates `TaskManagerServices` but never calls shutdown on that instance. Additionally, `TaskSubmissionTestEnvironment` does not expose shutdown logic (as far as I can see). All the other tests calling instantiating `TaskManagerServices` seem to have the `TaskExecutor` being instantiated and call close or stop on that one (i.e. these tests should be fine).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844067543


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java:
##########
@@ -76,7 +59,10 @@ public void setBlobWriter(BlobWriter blobWriter) {
     }
 
     public JobManagerSharedServices build() {
+        final ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();

Review Comment:
   I will fix those tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java:
##########
@@ -188,7 +187,9 @@ public JobMaster createJobMaster() throws Exception {
                         ? slotPoolServiceSchedulerFactory
                         : DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                                 configuration, jobGraph.getJobType()),
-                jobManagerSharedServices,
+                jobManagerSharedServices != null
+                        ? jobManagerSharedServices
+                        : new TestingJobManagerSharedServicesBuilder().build(),

Review Comment:
   yes; and see above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844077415


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
                             Time.milliseconds(50),
                             deadline,
                             (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
-                            TestingUtils.defaultScheduledExecutor());
+                            new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Review Comment:
   Although we couldn't put it into our test util modules.
   Maybe we should just amend the FutureUtils to work against a plain ScheduledExecutorService. Meh, that wouldn't quite work since they are also used with a MainThreadExecutor.
   
   Or add variants that also accept a ScheduledExecutorService.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844134619


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java:
##########
@@ -180,7 +172,7 @@ public TaskManagerServices build() {
                 taskStateManager,
                 taskChangelogStoragesManager,
                 taskEventDispatcher,
-                ioExecutor,
+                Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   I'll fix the MetricUtilsTest. TaskSubmissionTestEnvironment does implement autoclosable, and we could have it close the task executor as well. As far as I can tell all tests call close() on the environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19351:
URL: https://github.com/apache/flink/pull/19351#issuecomment-1087407592

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1580f63c4fd8e395ed41542e6ee322082cc6b23f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1580f63c4fd8e395ed41542e6ee322082cc6b23f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1580f63c4fd8e395ed41542e6ee322082cc6b23f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19351:
URL: https://github.com/apache/flink/pull/19351#issuecomment-1087575899

   We might want to switch to a `ExecutorServiceResource` that takes care of shutting the thread pool down at the end of the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19351:
URL: https://github.com/apache/flink/pull/19351#issuecomment-1088626016

   ...and `JMXJobManagerMetricTest` has a compilation error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842936866


##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java:
##########
@@ -73,6 +76,10 @@
 @ExtendWith(TestLoggerExtension.class)
 public class ApplicationDispatcherBootstrapITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =

Review Comment:
   according to the docs RegisterExtension fields should be package-private. It does work for static fields, but that may be an undocumented feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844032126


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -315,7 +334,7 @@ private void testExecutionFailsInNetworkRegistration(
                         .setPartitionProducerStateChecker(partitionProducerStateChecker)
                         .setResultPartitions(resultPartitions)
                         .setInputGates(inputGates)
-                        .build();
+                        .build(EXECUTOR_RESOURCE.getExecutor());

Review Comment:
   It doesn't use a direct executor now because it wasn't using `createTaskBuilder()`. I don't know why that is, and hence cant add a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r845013733


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -416,22 +410,18 @@ public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Excep
 
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
-        final JobManagerSharedServices jobManagerSharedServices =

Review Comment:
   Surely just some leftover from development 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844963328


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java:
##########
@@ -555,10 +562,11 @@ public void testSavepointRescalingPartitionedOperatorState(
                             () ->
                                     client.triggerSavepoint(
                                             jobID, null, SavepointFormatType.CANONICAL),
-                            (int) deadline.timeLeft().getSeconds() / 10,
-                            Time.seconds(10),
+                            new FixedRetryStrategy(
+                                    (int) deadline.timeLeft().getSeconds() / 10,
+                                    Duration.ofSeconds(60)),

Review Comment:
   nope



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842782174


##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java:
##########
@@ -73,6 +76,10 @@
 @ExtendWith(TestLoggerExtension.class)
 public class ApplicationDispatcherBootstrapITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
   ```



##########
flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java:
##########
@@ -47,16 +45,12 @@ public static Duration infiniteDuration() {
         return Duration.ofDays(365L);
     }
 
-    public static synchronized ScheduledExecutorService defaultExecutor() {
-        if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown()) {
-            sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
-        }
-
-        return sharedExecutorInstance;
+    public static TestExecutorExtension<ScheduledExecutorService> defaultExecutor() {

Review Comment:
   ```suggestion
       public static TestExecutorExtension<ScheduledExecutorService> defaultExecutorExtension() {
   ```



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java:
##########
@@ -24,22 +24,30 @@
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 /** Tests for the parameter handling of the {@link JarPlanHandler}. */
 public class JarPlanHandlerParameterTest
         extends JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> {
     private static JarPlanHandler handler;
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+
     @BeforeClass
     public static void setup() throws Exception {
-        init();
+        // init();

Review Comment:
   ```suggestion
           init();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java:
##########
@@ -54,6 +57,10 @@
 @ExtendWith(TestLoggerExtension.class)
 public abstract class AbstractHAJobRunITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java:
##########
@@ -180,7 +172,7 @@ public TaskManagerServices build() {
                 taskStateManager,
                 taskChangelogStoragesManager,
                 taskEventDispatcher,
-                ioExecutor,
+                Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   Shouldn't we rely on the rule/resource here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java:
##########
@@ -71,11 +77,8 @@ public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {
         return result;
     }
 
-    public static TimerService<AllocationID> createDefaultTimerService() {
-        return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
-    }
-
     public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
-        return new DefaultTimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
+        return new DefaultTimerService<>(
+                Executors.newSingleThreadScheduledExecutor(), shutdownTimeout);

Review Comment:
   rule/resource?



##########
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java:
##########
@@ -50,13 +52,18 @@
 import java.lang.management.ManagementFactory;
 import java.time.Duration;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests to verify JMX reporter functionality on the JobManager. */
 class JMXJobManagerMetricTest {
 
+    @RegisterExtension
+    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java:
##########
@@ -63,6 +66,10 @@
 @ExtendWith({TestLoggerExtension.class})
 public class TaskCancelAsyncProducerConsumerITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -57,6 +58,10 @@
 /** A collection of utility methods for testing the ExecutionGraph and its related classes. */
 public class ExecutionGraphTestUtils {
 
+    private static final ScheduledExecutorService executor =

Review Comment:
   Why do we add this here instead of utilizing the resource in the corresponding threads? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java:
##########
@@ -112,9 +111,15 @@ private TaskSubmissionTestEnvironment(
                     taskManagerActionListeners,
             @Nullable String metricQueryServiceAddress,
             TestingRpcService testingRpcService,
-            ShuffleEnvironment<?, ?> shuffleEnvironment)
+            ShuffleEnvironment<?, ?> shuffleEnvironment,
+            ScheduledExecutorService executor)
             throws Exception {
 
+        this.timerService =
+                new DefaultTimerService<>(
+                        java.util.concurrent.Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   Rule/Resource?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -56,12 +57,23 @@
     private static final Logger LOG =
             LoggerFactory.getLogger(TestingDefaultExecutionGraphBuilder.class);
 
+    private static final ScheduledExecutorService FUTURE_EXECUTOR =
+            Executors.newScheduledThreadPool(
+                    0,
+                    new ExecutorThreadFactory(
+                            "flink-future-" + TestingDefaultExecutionGraphBuilder.class));
+
+    private static final Executor IO_EXECUTOR =
+            Executors.newCachedThreadPool(
+                    new ExecutorThreadFactory(
+                            "flink-io-" + TestingDefaultExecutionGraphBuilder.class));

Review Comment:
   Again, why not using the resource here? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java:
##########
@@ -61,6 +65,10 @@
     /** Container for local objects to keep them from gc runs. */
     private final List<Object> referencedObjects = new ArrayList<>();
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+

Review Comment:
   This is never used anywhere?!



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java:
##########
@@ -66,13 +66,8 @@ private DeclarativeSlotManagerBuilder() {
         this.slotTracker = new DefaultSlotTracker();
     }
 
-    public static DeclarativeSlotManagerBuilder newBuilder() {
-        return new DeclarativeSlotManagerBuilder();
-    }
-
-    public DeclarativeSlotManagerBuilder setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
-        this.scheduledExecutor = scheduledExecutor;
-        return this;
+    public static DeclarativeSlotManagerBuilder newBuilder(ScheduledExecutor scheduledExecutor) {

Review Comment:
   hm, I always went for the required parameters being added as part of the `build()` method call rather than the `newBuilder()` call. But I think, you're right with doing it that way because it's still part of the initialization rather than the creation of the instance. 👍 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java:
##########
@@ -198,7 +198,8 @@ private SchedulerNG createScheduler(
             Time slotRequestTimeout,
             JobStatusListener jobStatusListener)
             throws Exception {
-        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor)
+        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, singleThreadScheduledExecutorService)

Review Comment:
   What about using the resource/extension here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -296,10 +296,10 @@ private static IntermediateResult createResult(
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
 
         SchedulerBase scheduler =
-                SchedulerTestingUtils.newSchedulerBuilder(
-                                jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                        .setIoExecutor(executorService)
-                        .setFutureExecutor(executorService)
+                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                                jobGraph,
+                                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                executorService)

Review Comment:
   the executorService is instantiated but never shut down here... Or am I missing something? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java:
##########
@@ -18,16 +18,19 @@
 
 package org.apache.flink.runtime.scheduler.benchmark;
 
-import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 /** Base class of all scheduler benchmarks. */
 public class SchedulerBenchmarkBase {
     public ScheduledExecutorService scheduledExecutorService;
 
     public void setup() {
-        scheduledExecutorService = TestingUtils.defaultExecutor();
+        scheduledExecutorService =

Review Comment:
   Why not use the the resource/extension here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java:
##########
@@ -77,6 +78,10 @@
                             Prio1OutboundChannelHandlerFactory.class.getCanonicalName())
                     .build();
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java:
##########
@@ -70,6 +73,9 @@
 /** An integration test which recovers from checkpoint after regaining the leadership. */
 @ExtendWith(TestLoggerExtension.class)
 public class JobDispatcherITCase {
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842959149


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java:
##########
@@ -71,11 +77,8 @@ public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {
         return result;
     }
 
-    public static TimerService<AllocationID> createDefaultTimerService() {
-        return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
-    }
-
     public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
-        return new DefaultTimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
+        return new DefaultTimerService<>(
+                Executors.newSingleThreadScheduledExecutor(), shutdownTimeout);

Review Comment:
   Not required in this case (and actually harmful) because the timer service expects to own the executor and is shutting it down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842964196


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java:
##########
@@ -66,13 +66,8 @@ private DeclarativeSlotManagerBuilder() {
         this.slotTracker = new DefaultSlotTracker();
     }
 
-    public static DeclarativeSlotManagerBuilder newBuilder() {
-        return new DeclarativeSlotManagerBuilder();
-    }
-
-    public DeclarativeSlotManagerBuilder setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
-        this.scheduledExecutor = scheduledExecutor;
-        return this;
+    public static DeclarativeSlotManagerBuilder newBuilder(ScheduledExecutor scheduledExecutor) {

Review Comment:
   I generally agree and a bit inconsistent in this regard. There are some cases where passing it via the constructor required less refactorings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842960738


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java:
##########
@@ -180,7 +172,7 @@ public TaskManagerServices build() {
                 taskStateManager,
                 taskChangelogStoragesManager,
                 taskEventDispatcher,
-                ioExecutor,
+                Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   TaskManagerServices assumes that it owns the executor and shuts it down on its own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843011335


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -57,6 +58,10 @@
 /** A collection of utility methods for testing the ExecutionGraph and its related classes. */
 public class ExecutionGraphTestUtils {
 
+    private static final ScheduledExecutorService executor =

Review Comment:
   same thinking as https://github.com/apache/flink/pull/19351#discussion_r843010096



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844028946


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java:
##########
@@ -71,11 +77,8 @@ public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {
         return result;
     }
 
-    public static TimerService<AllocationID> createDefaultTimerService() {
-        return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
-    }
-
     public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
-        return new DefaultTimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
+        return new DefaultTimerService<>(
+                Executors.newSingleThreadScheduledExecutor(), shutdownTimeout);

Review Comment:
   Ok, fair enough. I went over the calling tests to verify that either the taskSlotTable or the TaskExecutor are stopped which would trigger the closing of the executor. 👍 (only the `TaskSubmissionTestEnvironment` should be checked as already mentioned in a [previous comment](https://github.com/apache/flink/pull/19351#discussion_r844016207).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844070820


##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
                             Time.milliseconds(50),
                             deadline,
                             (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
-                            TestingUtils.defaultScheduledExecutor());
+                            new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));

Review Comment:
   We could add another resource/extension for this. We cant extend the executor resource because it may only contain a plain Executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org