You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/05 15:29:54 UTC

[GitHub] [flink] XComp commented on a diff in pull request #19351: [FLINK-27045][tests] Remove shared executor

XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r842782174


##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java:
##########
@@ -73,6 +76,10 @@
 @ExtendWith(TestLoggerExtension.class)
 public class ApplicationDispatcherBootstrapITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
   ```



##########
flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java:
##########
@@ -47,16 +45,12 @@ public static Duration infiniteDuration() {
         return Duration.ofDays(365L);
     }
 
-    public static synchronized ScheduledExecutorService defaultExecutor() {
-        if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown()) {
-            sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
-        }
-
-        return sharedExecutorInstance;
+    public static TestExecutorExtension<ScheduledExecutorService> defaultExecutor() {

Review Comment:
   ```suggestion
       public static TestExecutorExtension<ScheduledExecutorService> defaultExecutorExtension() {
   ```



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java:
##########
@@ -24,22 +24,30 @@
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 /** Tests for the parameter handling of the {@link JarPlanHandler}. */
 public class JarPlanHandlerParameterTest
         extends JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> {
     private static JarPlanHandler handler;
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+
     @BeforeClass
     public static void setup() throws Exception {
-        init();
+        // init();

Review Comment:
   ```suggestion
           init();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java:
##########
@@ -54,6 +57,10 @@
 @ExtendWith(TestLoggerExtension.class)
 public abstract class AbstractHAJobRunITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java:
##########
@@ -180,7 +172,7 @@ public TaskManagerServices build() {
                 taskStateManager,
                 taskChangelogStoragesManager,
                 taskEventDispatcher,
-                ioExecutor,
+                Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   Shouldn't we rely on the rule/resource here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java:
##########
@@ -71,11 +77,8 @@ public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {
         return result;
     }
 
-    public static TimerService<AllocationID> createDefaultTimerService() {
-        return createDefaultTimerService(DEFAULT_SLOT_TIMEOUT);
-    }
-
     public static TimerService<AllocationID> createDefaultTimerService(long shutdownTimeout) {
-        return new DefaultTimerService<>(TestingUtils.defaultExecutor(), shutdownTimeout);
+        return new DefaultTimerService<>(
+                Executors.newSingleThreadScheduledExecutor(), shutdownTimeout);

Review Comment:
   rule/resource?



##########
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java:
##########
@@ -50,13 +52,18 @@
 import java.lang.management.ManagementFactory;
 import java.time.Duration;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests to verify JMX reporter functionality on the JobManager. */
 class JMXJobManagerMetricTest {
 
+    @RegisterExtension
+    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java:
##########
@@ -63,6 +66,10 @@
 @ExtendWith({TestLoggerExtension.class})
 public class TaskCancelAsyncProducerConsumerITCase {
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -57,6 +58,10 @@
 /** A collection of utility methods for testing the ExecutionGraph and its related classes. */
 public class ExecutionGraphTestUtils {
 
+    private static final ScheduledExecutorService executor =

Review Comment:
   Why do we add this here instead of utilizing the resource in the corresponding threads? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java:
##########
@@ -112,9 +111,15 @@ private TaskSubmissionTestEnvironment(
                     taskManagerActionListeners,
             @Nullable String metricQueryServiceAddress,
             TestingRpcService testingRpcService,
-            ShuffleEnvironment<?, ?> shuffleEnvironment)
+            ShuffleEnvironment<?, ?> shuffleEnvironment,
+            ScheduledExecutorService executor)
             throws Exception {
 
+        this.timerService =
+                new DefaultTimerService<>(
+                        java.util.concurrent.Executors.newSingleThreadScheduledExecutor(),

Review Comment:
   Rule/Resource?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -56,12 +57,23 @@
     private static final Logger LOG =
             LoggerFactory.getLogger(TestingDefaultExecutionGraphBuilder.class);
 
+    private static final ScheduledExecutorService FUTURE_EXECUTOR =
+            Executors.newScheduledThreadPool(
+                    0,
+                    new ExecutorThreadFactory(
+                            "flink-future-" + TestingDefaultExecutionGraphBuilder.class));
+
+    private static final Executor IO_EXECUTOR =
+            Executors.newCachedThreadPool(
+                    new ExecutorThreadFactory(
+                            "flink-io-" + TestingDefaultExecutionGraphBuilder.class));

Review Comment:
   Again, why not using the resource here? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java:
##########
@@ -61,6 +65,10 @@
     /** Container for local objects to keep them from gc runs. */
     private final List<Object> referencedObjects = new ArrayList<>();
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+

Review Comment:
   This is never used anywhere?!



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java:
##########
@@ -66,13 +66,8 @@ private DeclarativeSlotManagerBuilder() {
         this.slotTracker = new DefaultSlotTracker();
     }
 
-    public static DeclarativeSlotManagerBuilder newBuilder() {
-        return new DeclarativeSlotManagerBuilder();
-    }
-
-    public DeclarativeSlotManagerBuilder setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
-        this.scheduledExecutor = scheduledExecutor;
-        return this;
+    public static DeclarativeSlotManagerBuilder newBuilder(ScheduledExecutor scheduledExecutor) {

Review Comment:
   hm, I always went for the required parameters being added as part of the `build()` method call rather than the `newBuilder()` call. But I think, you're right with doing it that way because it's still part of the initialization rather than the creation of the instance. 👍 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java:
##########
@@ -198,7 +198,8 @@ private SchedulerNG createScheduler(
             Time slotRequestTimeout,
             JobStatusListener jobStatusListener)
             throws Exception {
-        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor)
+        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, singleThreadScheduledExecutorService)

Review Comment:
   What about using the resource/extension here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -296,10 +296,10 @@ private static IntermediateResult createResult(
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
 
         SchedulerBase scheduler =
-                SchedulerTestingUtils.newSchedulerBuilder(
-                                jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                        .setIoExecutor(executorService)
-                        .setFutureExecutor(executorService)
+                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                                jobGraph,
+                                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                executorService)

Review Comment:
   the executorService is instantiated but never shut down here... Or am I missing something? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java:
##########
@@ -18,16 +18,19 @@
 
 package org.apache.flink.runtime.scheduler.benchmark;
 
-import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 /** Base class of all scheduler benchmarks. */
 public class SchedulerBenchmarkBase {
     public ScheduledExecutorService scheduledExecutorService;
 
     public void setup() {
-        scheduledExecutorService = TestingUtils.defaultExecutor();
+        scheduledExecutorService =

Review Comment:
   Why not use the the resource/extension here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java:
##########
@@ -77,6 +78,10 @@
                             Prio1OutboundChannelHandlerFactory.class.getCanonicalName())
                     .build();
 
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java:
##########
@@ -70,6 +73,9 @@
 /** An integration test which recovers from checkpoint after regaining the leadership. */
 @ExtendWith(TestLoggerExtension.class)
 public class JobDispatcherITCase {
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =

Review Comment:
   ```suggestion
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org