You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/12 22:01:57 UTC

[flink] branch release-1.14 updated: [FLINK-20195][coordination] Deduplicate jobs for overview

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d3977d2  [FLINK-20195][coordination] Deduplicate jobs for overview
d3977d2 is described below

commit d3977d2912ed284aa1f7cd7c216232f9bcff73e5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sun Dec 12 12:20:41 2021 +0100

    [FLINK-20195][coordination] Deduplicate jobs for overview
---
 .../flink/runtime/dispatcher/Dispatcher.java       |   9 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 129 +++++++++++++++++++++
 .../runtime/jobmaster/TestingJobManagerRunner.java |  24 +++-
 3 files changed, 154 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e08fe58..9560612 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -580,13 +580,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         return combinedJobDetails.thenApply(
                 (Collection<JobDetails> runningJobDetails) -> {
-                    final Collection<JobDetails> allJobDetails =
-                            new ArrayList<>(completedJobDetails.size() + runningJobDetails.size());
+                    final Map<JobID, JobDetails> deduplicatedJobs = new HashMap<>();
 
-                    allJobDetails.addAll(runningJobDetails);
-                    allJobDetails.addAll(completedJobDetails);
+                    completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
+                    runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
 
-                    return new MultipleJobsDetails(allJobDetails);
+                    return new MultipleJobsDetails(deduplicatedJobs.values());
                 });
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3923565..fb36a9e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -97,10 +98,13 @@ import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -815,6 +819,107 @@ public class DispatcherTest extends AbstractDispatcherTest {
     }
 
     @Test
+    public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception {
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
+
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+        assertOnlyContainsSingleJobWithState(
+                JobStatus.SUSPENDED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+    }
+
+    @Test
+    public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception {
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED),
+                        runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING));
+
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        // run first job, which completes with SUSPENDED
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+        // run second job, which stays in RUNNING
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        assertOnlyContainsSingleJobWithState(
+                JobStatus.RUNNING, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+    }
+
+    @Test
+    public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception {
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED),
+                        completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED));
+
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        // run first job, which completes with SUSPENDED
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+        // run second job, which completes with FINISHED
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        assertOnlyContainsSingleJobWithState(
+                JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+    }
+
+    private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+            final JobStatus currentJobStatus) {
+        Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+
+        return TestingJobManagerRunner.newBuilder()
+                .setJobId(jobId)
+                .setJobDetailsFunction(
+                        () ->
+                                JobDetails.createDetailsForJob(
+                                        new ArchivedExecutionGraphBuilder()
+                                                .setJobID(jobId)
+                                                .setState(currentJobStatus)
+                                                .build()))
+                .build();
+    }
+
+    private JobManagerRunner completedJobManagerRunnerWithJobStatus(
+            final JobStatus finalJobStatus) {
+        Preconditions.checkArgument(finalJobStatus.isTerminalState());
+
+        return TestingJobManagerRunner.newBuilder()
+                .setJobId(jobId)
+                .setResultFuture(
+                        CompletableFuture.completedFuture(
+                                JobManagerRunnerResult.forSuccess(
+                                        new ExecutionGraphInfo(
+                                                new ArchivedExecutionGraphBuilder()
+                                                        .setJobID(jobId)
+                                                        .setState(finalJobStatus)
+                                                        .build()))))
+                .build();
+    }
+
+    private static void assertOnlyContainsSingleJobWithState(
+            final JobStatus expectedJobStatus, final MultipleJobsDetails multipleJobsDetails) {
+        final Collection<JobDetails> finishedJobDetails = multipleJobsDetails.getJobs();
+        assertEquals(1, finishedJobDetails.size());
+        assertEquals(expectedJobStatus, finishedJobDetails.iterator().next().getStatus());
+    }
+
+    @Test
     public void testJobDataAreCleanedUpInCorrectOrderOnFinishedJob() throws Exception {
         testJobDataAreCleanedUpInCorrectOrder(JobStatus.FINISHED);
     }
@@ -1179,6 +1284,30 @@ public class DispatcherTest extends AbstractDispatcherTest {
         }
     }
 
+    private static class QueuedJobManagerRunnerFactory implements JobManagerRunnerFactory {
+
+        private final Queue<JobManagerRunner> resultFutureQueue;
+
+        private QueuedJobManagerRunnerFactory(JobManagerRunner... resultFutureQueue) {
+            this.resultFutureQueue = new ArrayDeque<>(Arrays.asList(resultFutureQueue));
+        }
+
+        @Override
+        public JobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerServices,
+                JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                long initializationTimestamp)
+                throws Exception {
+            return resultFutureQueue.remove();
+        }
+    }
+
     private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
         private final CompletableFuture<JobManagerRunnerResult> resultFuture;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index e6d613c..0c0994d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of the {@link JobManagerRunner}. */
 public class TestingJobManagerRunner implements JobManagerRunner {
@@ -47,6 +48,8 @@ public class TestingJobManagerRunner implements JobManagerRunner {
 
     private final CompletableFuture<JobManagerRunnerResult> resultFuture;
 
+    private final Supplier<JobDetails> jobDetailsFunction;
+
     private final OneShotLatch closeAsyncCalledLatch = new OneShotLatch();
 
     private JobStatus jobStatus = JobStatus.INITIALIZING;
@@ -55,11 +58,13 @@ public class TestingJobManagerRunner implements JobManagerRunner {
             JobID jobId,
             boolean blockingTermination,
             CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> resultFuture) {
+            CompletableFuture<JobManagerRunnerResult> resultFuture,
+            Supplier<JobDetails> jobDetailsFunction) {
         this.jobId = jobId;
         this.blockingTermination = blockingTermination;
         this.jobMasterGatewayFuture = jobMasterGatewayFuture;
         this.resultFuture = resultFuture;
+        this.jobDetailsFunction = jobDetailsFunction;
         this.terminationFuture = new CompletableFuture<>();
 
         final ExecutionGraphInfo suspendedExecutionGraphInfo =
@@ -103,7 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
 
     @Override
     public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
-        throw new UnsupportedOperationException();
+        return CompletableFuture.completedFuture(jobDetailsFunction.get());
     }
 
     @Override
@@ -166,6 +171,10 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
                 new CompletableFuture<>();
         private CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
+        private Supplier<JobDetails> jobDetailsFunction =
+                () -> {
+                    throw new UnsupportedOperationException();
+                };
 
         private Builder() {
             // No-op.
@@ -194,10 +203,19 @@ public class TestingJobManagerRunner implements JobManagerRunner {
             return this;
         }
 
+        public Builder setJobDetailsFunction(Supplier<JobDetails> jobDetailsFunction) {
+            this.jobDetailsFunction = Preconditions.checkNotNull(jobDetailsFunction);
+            return this;
+        }
+
         public TestingJobManagerRunner build() {
             Preconditions.checkNotNull(jobId);
             return new TestingJobManagerRunner(
-                    jobId, blockingTermination, jobMasterGatewayFuture, resultFuture);
+                    jobId,
+                    blockingTermination,
+                    jobMasterGatewayFuture,
+                    resultFuture,
+                    jobDetailsFunction);
         }
     }
 }