You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/03 13:49:37 UTC

[GitHub] [flink] XComp commented on a diff in pull request #21137: [FLINK-29234][runtime] JobMasterServiceLeadershipRunner handle leader event in a separate executor to avoid dead lock

XComp commented on code in PR #21137:
URL: https://github.com/apache/flink/pull/21137#discussion_r1011489421


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -46,45 +49,36 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nonnull;
 
-import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JobMasterServiceLeadershipRunner}. */
-public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Usually, we would create a file `org.junit.jupiter.api.extension.Extension` in `src/test/resources/META-INF/services` containing `org.apache.flink.util.TestLoggerExtension`. This way, the extension applies to all JUnit5 tests in the module (e.g. see `flink-clients` module).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java:
##########
@@ -20,124 +20,114 @@
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of {@link JobMasterServiceProcess}. */
 public class TestingJobMasterServiceProcess implements JobMasterServiceProcess {
 
-    private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
-
-    private final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture;
-
-    private final CompletableFuture<String> leaderAddressFuture;
-
-    private final boolean isInitialized;
-
-    private final CompletableFuture<Void> terminationFuture;
-
-    private final boolean manualTerminationFutureCompletion;
-
-    private TestingJobMasterServiceProcess(
-            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture,
-            CompletableFuture<String> leaderAddressFuture,
-            boolean isInitialized,
-            CompletableFuture<Void> terminationFuture,
-            boolean manualTerminationFutureCompletion) {
-        this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-        this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
-        this.leaderAddressFuture = leaderAddressFuture;
-        this.isInitialized = isInitialized;
-        this.terminationFuture = terminationFuture;
-        this.manualTerminationFutureCompletion = manualTerminationFutureCompletion;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+    private final Supplier<Boolean> isInitializedAndRunningSupplier;
+    private final Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier;
+    private final Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier;
+    private final Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier;
+
+    public TestingJobMasterServiceProcess(
+            Supplier<CompletableFuture<Void>> closeAsyncSupplier,
+            Supplier<Boolean> isInitializedAndRunningSupplier,
+            Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier,
+            Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier,
+            Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier) {
+        this.closeAsyncSupplier = closeAsyncSupplier;
+        this.isInitializedAndRunningSupplier = isInitializedAndRunningSupplier;
+        this.getJobMasterGatewayFutureSupplier = getJobMasterGatewayFutureSupplier;
+        this.getResultFutureSupplier = getResultFutureSupplier;
+        this.getLeaderAddressFutureSupplier = getLeaderAddressFutureSupplier;
     }
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        if (!manualTerminationFutureCompletion) {
-            terminationFuture.complete(null);
-        }
-
-        return terminationFuture;
+        return closeAsyncSupplier.get();
     }
 
     @Override
     public boolean isInitializedAndRunning() {
-        return isInitialized && !terminationFuture.isDone();
+        return isInitializedAndRunningSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture() {
-        return jobMasterGatewayFuture;
+        return getJobMasterGatewayFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
-        return jobManagerRunnerResultFuture;
+        return getResultFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<String> getLeaderAddressFuture() {
-        return leaderAddressFuture;
+        return getLeaderAddressFutureSupplier.get();
     }
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
     public static final class Builder {
-        private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
-                CompletableFuture.completedFuture(new TestingJobMasterGatewayBuilder().build());
-        private CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture =
-                new CompletableFuture<>();
-        private CompletableFuture<String> leaderAddressFuture =
-                CompletableFuture.completedFuture("foobar");
-        private boolean isInitialized = true;
-        @Nullable private CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
-        private boolean manualTerminationFutureCompletion = false;
-
-        public Builder setJobMasterGatewayFuture(
-                CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) {
-            this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-            return this;
+        private Supplier<CompletableFuture<Void>> closeAsyncSupplier = unsupportedOperation();
+        private Supplier<Boolean> isInitializedAndRunningSupplier = unsupportedOperation();
+        private Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier =
+                () ->
+                        CompletableFuture.completedFuture(
+                                new TestingJobMasterGatewayBuilder().build());;

Review Comment:
   ```suggestion
                                   new TestingJobMasterGatewayBuilder().build());
   ```
   Checkstyle complaining here again



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -683,21 +686,148 @@ public void testJobAlreadyDone() throws Exception {
                     jobManagerRunner.getResultFuture();
 
             JobManagerRunnerResult result = resultFuture.get();
-            assertEquals(
-                    JobStatus.FAILED,
-                    result.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
+            assertThat(result.getExecutionGraphInfo().getArchivedExecutionGraph().getState())
+                    .isEqualTo(JobStatus.FAILED);
         }
     }
 
+    @Test
+    void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
+            throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
+                testingLeaderElectionDriverFactory =
+                        new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        // we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner
+        // in connection with the DefaultLeaderElectionService generates the nested locking
+        final LeaderElectionService defaultLeaderElectionService =
+                new DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+
+        // latch to detect when we reached the first synchronized section having a lock on the
+        // JobMasterServiceProcess#stop side
+        final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
+        // latch to halt the JobMasterServiceProcess#stop before calling stop on the
+        // DefaultLeaderElectionService instance (and entering the LeaderElectionService's
+        // synchronized block)
+        final OneShotLatch triggerClassLoaderLeaseRelease = new OneShotLatch();
+
+        final JobMasterServiceProcess jobMasterServiceProcess =
+                TestingJobMasterServiceProcess.newBuilder()
+                        .setGetJobMasterGatewayFutureSupplier(CompletableFuture::new)
+                        .setGetResultFutureSupplier(CompletableFuture::new)
+                        .setGetLeaderAddressFutureSupplier(
+                                () -> CompletableFuture.completedFuture("unused address"))
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    closeAsyncCalledTrigger.trigger();
+                                    // we have to return a completed future because we need the
+                                    // follow-up task to run in the calling thread to make the
+                                    // follow-up logic block be executed in the synchronized block

Review Comment:
   ```suggestion
                                       // follow-up code block being executed in the synchronized block
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -151,10 +155,12 @@ public CompletableFuture<Void> closeAsync() {
                 FutureUtils.forward(serviceTerminationFuture, terminationFuture);
 
                 terminationFuture.whenComplete(
-                        (unused, throwable) ->
-                                LOG.debug(
-                                        "Leadership runner for job {} has been terminated.",
-                                        getJobID()));
+                        (unused, throwable) -> {
+                            LOG.debug(
+                                    "Leadership runner for job {} has been terminated.",
+                                    getJobID());
+                            handleLeaderEventExecutor.shutdown();
+                        });

Review Comment:
   Could we make the resulting `CompletableFuture` of this call being returned by `closeAsync`. Shutting down the executor is part of the closing procedure and, therefore, should be reflected in the method's result.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java:
##########
@@ -20,124 +20,114 @@
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of {@link JobMasterServiceProcess}. */
 public class TestingJobMasterServiceProcess implements JobMasterServiceProcess {
 
-    private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
-
-    private final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture;
-
-    private final CompletableFuture<String> leaderAddressFuture;
-
-    private final boolean isInitialized;
-
-    private final CompletableFuture<Void> terminationFuture;
-
-    private final boolean manualTerminationFutureCompletion;
-
-    private TestingJobMasterServiceProcess(
-            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture,
-            CompletableFuture<String> leaderAddressFuture,
-            boolean isInitialized,
-            CompletableFuture<Void> terminationFuture,
-            boolean manualTerminationFutureCompletion) {
-        this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-        this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
-        this.leaderAddressFuture = leaderAddressFuture;
-        this.isInitialized = isInitialized;
-        this.terminationFuture = terminationFuture;
-        this.manualTerminationFutureCompletion = manualTerminationFutureCompletion;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+    private final Supplier<Boolean> isInitializedAndRunningSupplier;
+    private final Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier;
+    private final Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier;
+    private final Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier;
+
+    public TestingJobMasterServiceProcess(

Review Comment:
   ```suggestion
       private TestingJobMasterServiceProcess(
   ```
   Intellij rightfully claimed that `private` is good enough here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -93,48 +87,47 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
     private JobResultStore jobResultStore;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
 
         final JobVertex jobVertex = new JobVertex("Test vertex");
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
     }
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         leaderElectionService = new TestingLeaderElectionService();
         jobResultStore = new EmbeddedJobResultStore();
         fatalErrorHandler = new TestingFatalErrorHandler();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {
         fatalErrorHandler.rethrowError();
     }
 
     @Test
-    public void testShutDownSignalsJobAsNotFinished() throws Exception {
+    void testShutDownSignalsJobAsNotFinished() throws Exception {
         try (JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build()) {
             jobManagerRunner.start();
 
             final CompletableFuture<JobManagerRunnerResult> resultFuture =
                     jobManagerRunner.getResultFuture();
 
-            assertThat(resultFuture.isDone(), is(false));
+            assertThat(resultFuture).isNotDone();
 
             jobManagerRunner.closeAsync();
 
             assertJobNotFinished(resultFuture);
-            assertThat(
-                    jobManagerRunner.getJobMasterGateway(),
-                    FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+            assertThat(jobManagerRunner.getJobMasterGateway())
+                    .failsWithin(5L, TimeUnit.MILLISECONDS);

Review Comment:
   We agreed on not relying on timeouts in JUnit tests anymore to enable the thread dump and prevent test instabilities (see [Flink's Common Code Guidelines](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests)). This also applies to the other test implementations if you want to migrate this test class as part of this PR.
   It would be also reasonable to not touch the JUnit5 migration here. I didn't check whether you actually did it to use some JUnit5 feature in that test. That might help focusing on the actual topic of the PR. ...considering that it's not that straight-forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org