You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/12 22:01:57 UTC
[flink] branch release-1.14 updated: [FLINK-20195][coordination] Deduplicate jobs for overview
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new d3977d2 [FLINK-20195][coordination] Deduplicate jobs for overview
d3977d2 is described below
commit d3977d2912ed284aa1f7cd7c216232f9bcff73e5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sun Dec 12 12:20:41 2021 +0100
[FLINK-20195][coordination] Deduplicate jobs for overview
---
.../flink/runtime/dispatcher/Dispatcher.java | 9 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 129 +++++++++++++++++++++
.../runtime/jobmaster/TestingJobManagerRunner.java | 24 +++-
3 files changed, 154 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e08fe58..9560612 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -580,13 +580,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
return combinedJobDetails.thenApply(
(Collection<JobDetails> runningJobDetails) -> {
- final Collection<JobDetails> allJobDetails =
- new ArrayList<>(completedJobDetails.size() + runningJobDetails.size());
+ final Map<JobID, JobDetails> deduplicatedJobs = new HashMap<>();
- allJobDetails.addAll(runningJobDetails);
- allJobDetails.addAll(completedJobDetails);
+ completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
+ runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
- return new MultipleJobsDetails(allJobDetails);
+ return new MultipleJobsDetails(deduplicatedJobs.values());
});
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3923565..fb36a9e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -97,10 +98,13 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -815,6 +819,107 @@ public class DispatcherTest extends AbstractDispatcherTest {
}
@Test
+ public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception {
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+ completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+ assertOnlyContainsSingleJobWithState(
+ JobStatus.SUSPENDED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+ }
+
+ @Test
+ public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception {
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+ completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED),
+ runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING));
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ // run first job, which completes with SUSPENDED
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+ // run second job, which stays in RUNNING
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ assertOnlyContainsSingleJobWithState(
+ JobStatus.RUNNING, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+ }
+
+ @Test
+ public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception {
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+ completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED),
+ completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED));
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ // run first job, which completes with SUSPENDED
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+ // run second job, which completes with FINISHED
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ assertOnlyContainsSingleJobWithState(
+ JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
+ }
+
+ private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+ final JobStatus currentJobStatus) {
+ Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+
+ return TestingJobManagerRunner.newBuilder()
+ .setJobId(jobId)
+ .setJobDetailsFunction(
+ () ->
+ JobDetails.createDetailsForJob(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(currentJobStatus)
+ .build()))
+ .build();
+ }
+
+ private JobManagerRunner completedJobManagerRunnerWithJobStatus(
+ final JobStatus finalJobStatus) {
+ Preconditions.checkArgument(finalJobStatus.isTerminalState());
+
+ return TestingJobManagerRunner.newBuilder()
+ .setJobId(jobId)
+ .setResultFuture(
+ CompletableFuture.completedFuture(
+ JobManagerRunnerResult.forSuccess(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(finalJobStatus)
+ .build()))))
+ .build();
+ }
+
+ private static void assertOnlyContainsSingleJobWithState(
+ final JobStatus expectedJobStatus, final MultipleJobsDetails multipleJobsDetails) {
+ final Collection<JobDetails> finishedJobDetails = multipleJobsDetails.getJobs();
+ assertEquals(1, finishedJobDetails.size());
+ assertEquals(expectedJobStatus, finishedJobDetails.iterator().next().getStatus());
+ }
+
+ @Test
public void testJobDataAreCleanedUpInCorrectOrderOnFinishedJob() throws Exception {
testJobDataAreCleanedUpInCorrectOrder(JobStatus.FINISHED);
}
@@ -1179,6 +1284,30 @@ public class DispatcherTest extends AbstractDispatcherTest {
}
}
+ private static class QueuedJobManagerRunnerFactory implements JobManagerRunnerFactory {
+
+ private final Queue<JobManagerRunner> resultFutureQueue;
+
+ private QueuedJobManagerRunnerFactory(JobManagerRunner... resultFutureQueue) {
+ this.resultFutureQueue = new ArrayDeque<>(Arrays.asList(resultFutureQueue));
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler,
+ long initializationTimestamp)
+ throws Exception {
+ return resultFutureQueue.remove();
+ }
+ }
+
private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory {
private final CompletableFuture<JobManagerRunnerResult> resultFuture;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index e6d613c..0c0994d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
/** Testing implementation of the {@link JobManagerRunner}. */
public class TestingJobManagerRunner implements JobManagerRunner {
@@ -47,6 +48,8 @@ public class TestingJobManagerRunner implements JobManagerRunner {
private final CompletableFuture<JobManagerRunnerResult> resultFuture;
+ private final Supplier<JobDetails> jobDetailsFunction;
+
private final OneShotLatch closeAsyncCalledLatch = new OneShotLatch();
private JobStatus jobStatus = JobStatus.INITIALIZING;
@@ -55,11 +58,13 @@ public class TestingJobManagerRunner implements JobManagerRunner {
JobID jobId,
boolean blockingTermination,
CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
- CompletableFuture<JobManagerRunnerResult> resultFuture) {
+ CompletableFuture<JobManagerRunnerResult> resultFuture,
+ Supplier<JobDetails> jobDetailsFunction) {
this.jobId = jobId;
this.blockingTermination = blockingTermination;
this.jobMasterGatewayFuture = jobMasterGatewayFuture;
this.resultFuture = resultFuture;
+ this.jobDetailsFunction = jobDetailsFunction;
this.terminationFuture = new CompletableFuture<>();
final ExecutionGraphInfo suspendedExecutionGraphInfo =
@@ -103,7 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
- throw new UnsupportedOperationException();
+ return CompletableFuture.completedFuture(jobDetailsFunction.get());
}
@Override
@@ -166,6 +171,10 @@ public class TestingJobManagerRunner implements JobManagerRunner {
private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
new CompletableFuture<>();
private CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
+ private Supplier<JobDetails> jobDetailsFunction =
+ () -> {
+ throw new UnsupportedOperationException();
+ };
private Builder() {
// No-op.
@@ -194,10 +203,19 @@ public class TestingJobManagerRunner implements JobManagerRunner {
return this;
}
+ public Builder setJobDetailsFunction(Supplier<JobDetails> jobDetailsFunction) {
+ this.jobDetailsFunction = Preconditions.checkNotNull(jobDetailsFunction);
+ return this;
+ }
+
public TestingJobManagerRunner build() {
Preconditions.checkNotNull(jobId);
return new TestingJobManagerRunner(
- jobId, blockingTermination, jobMasterGatewayFuture, resultFuture);
+ jobId,
+ blockingTermination,
+ jobMasterGatewayFuture,
+ resultFuture,
+ jobDetailsFunction);
}
}
}