You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:27 UTC
[flink] 09/10: [FLINK-25432][runtime] Adds JobManagerRunnerRegistry and integrates it into the Dispatcher
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f798996dad0e41ced3e9e293ee81d0caf225874
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Dec 15 14:54:27 2021 +0100
[FLINK-25432][runtime] Adds JobManagerRunnerRegistry and integrates it into the Dispatcher
---
.../DefaultJobManagerRunnerRegistry.java | 127 ++++++++++++
.../flink/runtime/dispatcher/Dispatcher.java | 65 +++---
.../dispatcher/JobManagerRunnerRegistry.java | 68 ++++++
.../DefaultJobManagerRunnerRegistryTest.java | 230 +++++++++++++++++++++
.../TestingJobManagerRunnerRegistry.java | 199 ++++++++++++++++++
.../runtime/jobmaster/TestingJobManagerRunner.java | 6 +-
6 files changed, 664 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..2ba54e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link
+ * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from
+ * within the main thread.
+ */
+public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
+
+ @VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners;
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
+ public DefaultJobManagerRunnerRegistry(
+ int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) {
+ Preconditions.checkArgument(initialCapacity > 0);
+ jobManagerRunners = new HashMap<>(initialCapacity);
+ this.mainThreadExecutor = mainThreadExecutor;
+ }
+
+ @Override
+ public boolean isRegistered(JobID jobId) {
+ return jobManagerRunners.containsKey(jobId);
+ }
+
+ @Override
+ public void register(JobManagerRunner jobManagerRunner) {
+ mainThreadExecutor.assertRunningInMainThread();
+ Preconditions.checkArgument(
+ !isRegistered(jobManagerRunner.getJobID()),
+ "A job with the ID %s is already registered.",
+ jobManagerRunner.getJobID());
+ this.jobManagerRunners.put(jobManagerRunner.getJobID(), jobManagerRunner);
+ }
+
+ @Override
+ public JobManagerRunner get(JobID jobId) {
+ assertJobRegistered(jobId);
+ return this.jobManagerRunners.get(jobId);
+ }
+
+ @Override
+ public int size() {
+ return this.jobManagerRunners.size();
+ }
+
+ @Override
+ public Set<JobID> getRunningJobIds() {
+ return new HashSet<>(this.jobManagerRunners.keySet());
+ }
+
+ @Override
+ public Collection<JobManagerRunner> getJobManagerRunners() {
+ return new ArrayList<>(this.jobManagerRunners.values());
+ }
+
+ @Override
+ public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor unusedExecutor) {
+ return cleanup(jobId);
+ }
+
+ @Override
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) {
+ return cleanup(jobId);
+ }
+
+ private CompletableFuture<Void> cleanup(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ if (isRegistered(jobId)) {
+ try {
+ unregister(jobId).close();
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
+ return FutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public JobManagerRunner unregister(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ assertJobRegistered(jobId);
+ return this.jobManagerRunners.remove(jobId);
+ }
+
+ private void assertJobRegistered(JobID jobId) {
+ if (!isRegistered(jobId)) {
+ throw new NoSuchElementException(
+ "There is no running job registered for the job ID " + jobId);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6599115..88f9237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -123,7 +123,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private final FatalErrorHandler fatalErrorHandler;
- private final Map<JobID, JobManagerRunner> runningJobs;
+ private final JobManagerRunnerRegistry jobManagerRunnerRegistry;
private final Collection<JobGraph> recoveredJobs;
@@ -184,7 +184,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
JobManagerSharedServices.fromConfiguration(
configuration, blobServer, fatalErrorHandler);
- runningJobs = new HashMap<>(16);
+ jobManagerRunnerRegistry =
+ new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor());
this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
@@ -385,7 +386,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
* @throws FlinkException if the job scheduling status cannot be retrieved
*/
private boolean isDuplicateJob(JobID jobId) throws FlinkException {
- return isInGloballyTerminalState(jobId) || runningJobs.containsKey(jobId);
+ return isInGloballyTerminalState(jobId) || jobManagerRunnerRegistry.isRegistered(jobId);
}
/**
@@ -458,12 +459,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception {
- Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
+ Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
long initializationTimestamp = System.currentTimeMillis();
JobManagerRunner jobManagerRunner =
createJobManagerRunner(jobGraph, initializationTimestamp);
- runningJobs.put(jobGraph.getJobID(), jobManagerRunner);
+ jobManagerRunnerRegistry.register(jobManagerRunner);
final JobID jobId = jobGraph.getJobID();
@@ -473,7 +474,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
.handleAsync(
(jobManagerRunnerResult, throwable) -> {
Preconditions.checkState(
- runningJobs.get(jobId) == jobManagerRunner,
+ jobManagerRunnerRegistry.isRegistered(jobId)
+ && jobManagerRunnerRegistry.get(jobId)
+ == jobManagerRunner,
"The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");
if (jobManagerRunnerResult != null) {
@@ -545,7 +548,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
- Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet())));
+ Collections.unmodifiableSet(jobManagerRunnerRegistry.getRunningJobIds()));
}
@Override
@@ -690,9 +693,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
- JobManagerRunner job = runningJobs.get(jobId);
-
- if (job == null) {
+ if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
if (executionGraphInfo == null) {
@@ -701,15 +702,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
return CompletableFuture.completedFuture(
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
}
- } else {
- return job.getResultFuture()
- .thenApply(
- jobManagerRunnerResult ->
- JobResult.createFrom(
- jobManagerRunnerResult
- .getExecutionGraphInfo()
- .getArchivedExecutionGraph()));
}
+
+ final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId);
+ return jobManagerRunner
+ .getResultFuture()
+ .thenApply(
+ jobManagerRunnerResult ->
+ JobResult.createFrom(
+ jobManagerRunnerResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()));
}
@Override
@@ -856,7 +859,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) {
- final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
+ final JobManagerRunner job = checkNotNull(jobManagerRunnerRegistry.unregister(jobId));
return CompletableFuture.supplyAsync(
() -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor)
.thenCompose(
@@ -958,7 +961,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private void terminateRunningJobs() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
- final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet());
+ final Set<JobID> jobsToRemove = jobManagerRunnerRegistry.getRunningJobIds();
for (JobID jobId : jobsToRemove) {
terminateJob(jobId);
@@ -966,9 +969,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private void terminateJob(JobID jobId) {
- final JobManagerRunner jobManagerRunner = runningJobs.get(jobId);
-
- if (jobManagerRunner != null) {
+ if (jobManagerRunnerRegistry.isRegistered(jobId)) {
+ final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId);
jobManagerRunner.closeAsync();
}
}
@@ -1083,11 +1085,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
/** Ensures that the JobMasterGateway is available. */
private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
- JobManagerRunner job = runningJobs.get(jobId);
- if (job == null) {
+ if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
}
+ final JobManagerRunner job = jobManagerRunnerRegistry.get(jobId);
if (!job.isInitialized()) {
return FutureUtils.completedExceptionally(
new UnavailableDispatcherOperationException(
@@ -1107,7 +1109,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private Optional<JobManagerRunner> getJobManagerRunner(JobID jobId) {
- return Optional.ofNullable(runningJobs.get(jobId));
+ return jobManagerRunnerRegistry.isRegistered(jobId)
+ ? Optional.of(jobManagerRunnerRegistry.get(jobId))
+ : Optional.empty();
}
private <T> CompletableFuture<T> runResourceManagerCommand(
@@ -1129,9 +1133,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
Function<JobManagerRunner, CompletableFuture<T>> queryFunction) {
List<CompletableFuture<Optional<T>>> optionalJobInformation =
- new ArrayList<>(runningJobs.size());
+ new ArrayList<>(jobManagerRunnerRegistry.size());
- for (JobManagerRunner job : runningJobs.values()) {
+ for (JobManagerRunner job : jobManagerRunnerRegistry.getJobManagerRunners()) {
final CompletableFuture<Optional<T>> queryResult =
queryFunction
.apply(job)
@@ -1165,7 +1169,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
- if (runningJobs.containsKey(jobId)) {
+ if (jobManagerRunnerRegistry.isRegistered(jobId)) {
return FutureUtils.completedExceptionally(
new DispatcherException(
String.format("Job with job id %s is still running.", jobId)));
@@ -1176,7 +1180,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
- jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) runningJobs.size());
+ jobManagerMetricGroup.gauge(
+ MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size());
}
public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java
new file mode 100644
index 0000000..dfd342c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/** {@code JobManagerRunner} collects running jobs represented by {@link JobManagerRunner}. */
+public interface JobManagerRunnerRegistry
+ extends LocallyCleanableResource, GloballyCleanableResource {
+
+ /**
+ * Checks whether a {@link JobManagerRunner} is registered under the given {@link JobID}.
+ *
+ * @param jobId The {@code JobID} to check.
+ * @return {@code true}, if a {@code JobManagerRunner} is registered; {@code false} otherwise.
+ */
+ boolean isRegistered(JobID jobId);
+
+ /** Registers the given {@link JobManagerRunner} instance. */
+ void register(JobManagerRunner jobManagerRunner);
+
+ /**
+ * Returns the {@link JobManagerRunner} for the given {@code JobID}.
+ *
+ * @throws NoSuchElementException if the passed {@code JobID} does not belong to a registered
+ * {@code JobManagerRunner}.
+ * @see #isRegistered(JobID)
+ */
+ JobManagerRunner get(JobID jobId);
+
+ /** Returns the number of {@link JobManagerRunner} instances currently being registered. */
+ int size();
+
+ /** Returns {@link JobID} instances of registered {@link JobManagerRunner} instances. */
+ Set<JobID> getRunningJobIds();
+
+ /** Returns the registered {@link JobManagerRunner} instances. */
+ Collection<JobManagerRunner> getJobManagerRunners();
+
+ /**
+ * Unregistered the {@link JobManagerRunner} with the given {@code JobID}. {@code null} is
+ * returned if there's no {@code JobManagerRunner} registered for the given {@link JobID}.
+ */
+ JobManagerRunner unregister(JobID jobId);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
new file mode 100644
index 0000000..6eca873
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * {@code DefaultJobManagerRunnerRegistryTest} tests the functionality of {@link
+ * DefaultJobManagerRunnerRegistry}.
+ */
+public class DefaultJobManagerRunnerRegistryTest {
+
+ private JobManagerRunnerRegistry testInstance;
+
+ @BeforeEach
+ public void setup() {
+ testInstance =
+ new DefaultJobManagerRunnerRegistry(
+ 4, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ @Test
+ public void testIsRegistered() {
+ final JobID jobId = new JobID();
+ testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+ assertThat(testInstance.isRegistered(jobId)).isTrue();
+ }
+
+ @Test
+ public void testIsNotRegistered() {
+ assertThat(testInstance.isRegistered(new JobID())).isFalse();
+ }
+
+ @Test
+ public void testRegister() {
+ final JobID jobId = new JobID();
+ testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+ assertThat(testInstance.isRegistered(jobId)).isTrue();
+ }
+
+ @Test
+ public void testRegisteringTwiceCausesFailure() {
+ final JobID jobId = new JobID();
+ testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+ assertThat(testInstance.isRegistered(jobId)).isTrue();
+
+ assertThatThrownBy(
+ () ->
+ testInstance.register(
+ TestingJobManagerRunner.newBuilder()
+ .setJobId(jobId)
+ .build()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testGet() {
+ final JobID jobId = new JobID();
+ final JobManagerRunner jobManagerRunner =
+ TestingJobManagerRunner.newBuilder().setJobId(jobId).build();
+ testInstance.register(jobManagerRunner);
+
+ assertThat(testInstance.get(jobId)).isEqualTo(jobManagerRunner);
+ }
+
+ @Test
+ public void testGetOnNonExistingJobManagerRunner() {
+ assertThatThrownBy(() -> testInstance.get(new JobID()))
+ .isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void size() {
+ assertThat(testInstance.size()).isEqualTo(0);
+ testInstance.register(TestingJobManagerRunner.newBuilder().build());
+ assertThat(testInstance.size()).isEqualTo(1);
+ testInstance.register(TestingJobManagerRunner.newBuilder().build());
+ assertThat(testInstance.size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testGetRunningJobIds() {
+ assertThat(testInstance.getRunningJobIds()).isEmpty();
+
+ final JobID jobId0 = new JobID();
+ final JobID jobId1 = new JobID();
+ testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId0).build());
+ testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId1).build());
+
+ assertThat(testInstance.getRunningJobIds()).containsExactlyInAnyOrder(jobId0, jobId1);
+ }
+
+ @Test
+ public void testGetJobManagerRunners() {
+ assertThat(testInstance.getJobManagerRunners()).isEmpty();
+
+ final JobManagerRunner jobManagerRunner0 = TestingJobManagerRunner.newBuilder().build();
+ final JobManagerRunner jobManagerRunner1 = TestingJobManagerRunner.newBuilder().build();
+ testInstance.register(jobManagerRunner0);
+ testInstance.register(jobManagerRunner1);
+
+ assertThat(testInstance.getJobManagerRunners())
+ .containsExactlyInAnyOrder(jobManagerRunner0, jobManagerRunner1);
+ }
+
+ @Test
+ public void testSuccessfulLocalCleanup() throws Throwable {
+ final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+ assertThat(
+ testInstance.localCleanupAsync(
+ jobManagerRunner.getJobID(), Executors.directExecutor()))
+ .isCompleted();
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(jobManagerRunner.getTerminationFuture()).isCompleted();
+ }
+
+ @Test
+ public void testFailingLocalCleanup() {
+ final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+ assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+ final RuntimeException expectedException = new RuntimeException("Expected exception");
+ jobManagerRunner.completeTerminationFutureExceptionally(expectedException);
+
+ assertThat(
+ testInstance.localCleanupAsync(
+ jobManagerRunner.getJobID(), Executors.directExecutor()))
+ .failsWithin(Duration.ZERO)
+ .withThrowableOfType(ExecutionException.class)
+ .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+ .hasExactlyElementsOfTypes(
+ ExecutionException.class,
+ FlinkException.class,
+ expectedException.getClass())
+ .last()
+ .isEqualTo(expectedException);
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ }
+
+ @Test
+ public void testSuccessfulLocalCleanupAsync() throws Exception {
+ final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+ final CompletableFuture<Void> cleanupResult =
+ testInstance.localCleanupAsync(
+ jobManagerRunner.getJobID(), Executors.directExecutor());
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(cleanupResult).isCompleted();
+ }
+
+ @Test
+ public void testFailingLocalCleanupAsync() throws Exception {
+ final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+ assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+ final RuntimeException expectedException = new RuntimeException("Expected exception");
+ jobManagerRunner.completeTerminationFutureExceptionally(expectedException);
+
+ final CompletableFuture<Void> cleanupResult =
+ testInstance.localCleanupAsync(
+ jobManagerRunner.getJobID(), Executors.directExecutor());
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(cleanupResult)
+ .isCompletedExceptionally()
+ .failsWithin(Duration.ZERO)
+ .withThrowableOfType(ExecutionException.class)
+ .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+ .hasExactlyElementsOfTypes(
+ ExecutionException.class,
+ FlinkException.class,
+ expectedException.getClass())
+ .last()
+ .isEqualTo(expectedException);
+ }
+
+ private TestingJobManagerRunner registerTestingJobManagerRunner() {
+ final TestingJobManagerRunner jobManagerRunner =
+ TestingJobManagerRunner.newBuilder().build();
+ testInstance.register(jobManagerRunner);
+
+ assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+ assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+ return jobManagerRunner;
+ }
+
+ @Test
+ public void testLocalCleanupAsyncOnUnknownJobId() {
+ assertThat(testInstance.localCleanupAsync(new JobID(), Executors.directExecutor()))
+ .isCompleted();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..c9e60df
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * {@code TestingJobManagerRunnerRegistry} is a test implementation of {@link
+ * JobManagerRunnerRegistry}.
+ */
+public class TestingJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
+
+ private final Function<JobID, Boolean> isRegisteredFunction;
+ private final Consumer<JobManagerRunner> registerConsumer;
+ private final Function<JobID, JobManagerRunner> getFunction;
+ private final Supplier<Integer> sizeSupplier;
+ private final Supplier<Set<JobID>> getRunningJobIdsSupplier;
+ private final Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier;
+ private final Function<JobID, JobManagerRunner> unregisterFunction;
+ private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction;
+ private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction;
+
+ private TestingJobManagerRunnerRegistry(
+ Function<JobID, Boolean> isRegisteredFunction,
+ Consumer<JobManagerRunner> registerConsumer,
+ Function<JobID, JobManagerRunner> getFunction,
+ Supplier<Integer> sizeSupplier,
+ Supplier<Set<JobID>> getRunningJobIdsSupplier,
+ Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier,
+ Function<JobID, JobManagerRunner> unregisterFunction,
+ BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction,
+ BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) {
+ this.isRegisteredFunction = isRegisteredFunction;
+ this.registerConsumer = registerConsumer;
+ this.getFunction = getFunction;
+ this.sizeSupplier = sizeSupplier;
+ this.getRunningJobIdsSupplier = getRunningJobIdsSupplier;
+ this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier;
+ this.unregisterFunction = unregisterFunction;
+ this.localCleanupAsyncFunction = localCleanupAsyncFunction;
+ this.globalCleanupAsyncFunction = globalCleanupAsyncFunction;
+ }
+
+ @Override
+ public boolean isRegistered(JobID jobId) {
+ return isRegisteredFunction.apply(jobId);
+ }
+
+ @Override
+ public void register(JobManagerRunner jobManagerRunner) {
+ registerConsumer.accept(jobManagerRunner);
+ }
+
+ @Override
+ public JobManagerRunner get(JobID jobId) {
+ return getFunction.apply(jobId);
+ }
+
+ @Override
+ public int size() {
+ return sizeSupplier.get();
+ }
+
+ @Override
+ public Set<JobID> getRunningJobIds() {
+ return getRunningJobIdsSupplier.get();
+ }
+
+ @Override
+ public Collection<JobManagerRunner> getJobManagerRunners() {
+ return getJobManagerRunnersSupplier.get();
+ }
+
+ @Override
+ public JobManagerRunner unregister(JobID jobId) {
+ return unregisterFunction.apply(jobId);
+ }
+
+ @Override
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
+ return localCleanupAsyncFunction.apply(jobId, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
+ return globalCleanupAsyncFunction.apply(jobId, executor);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** {@code Builder} for creating {@code TestingJobManagerRunnerRegistry} instances. */
+ public static class Builder {
+
+ private Function<JobID, Boolean> isRegisteredFunction = ignoredJobId -> true;
+ private Consumer<JobManagerRunner> registerConsumer = ignoredRunner -> {};
+ private Function<JobID, JobManagerRunner> getFunction = ignoredJobId -> null;
+ private Supplier<Integer> sizeSupplier = () -> 0;
+ private Supplier<Set<JobID>> getRunningJobIdsSupplier = Collections::emptySet;
+ private Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier =
+ Collections::emptyList;
+ private Function<JobID, JobManagerRunner> unregisterFunction = ignoredJobId -> null;
+ private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction =
+ (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture();
+ private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction =
+ (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture();
+
+ public Builder withIsRegisteredFunction(Function<JobID, Boolean> isRegisteredFunction) {
+ this.isRegisteredFunction = isRegisteredFunction;
+ return this;
+ }
+
+ public Builder withRegisterConsumer(Consumer<JobManagerRunner> registerConsumer) {
+ this.registerConsumer = registerConsumer;
+ return this;
+ }
+
+ public Builder withGetFunction(Function<JobID, JobManagerRunner> getFunction) {
+ this.getFunction = getFunction;
+ return this;
+ }
+
+ public Builder withSizeSupplier(Supplier<Integer> sizeSupplier) {
+ this.sizeSupplier = sizeSupplier;
+ return this;
+ }
+
+ public Builder withGetRunningJobIdsSupplier(Supplier<Set<JobID>> getRunningJobIdsSupplier) {
+ this.getRunningJobIdsSupplier = getRunningJobIdsSupplier;
+ return this;
+ }
+
+ public Builder withGetJobManagerRunnersSupplier(
+ Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier) {
+ this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier;
+ return this;
+ }
+
+ public Builder withUnregisterFunction(
+ Function<JobID, JobManagerRunner> unregisterFunction) {
+ this.unregisterFunction = unregisterFunction;
+ return this;
+ }
+
+ public Builder withLocalCleanupAsyncFunction(
+ BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction) {
+ this.localCleanupAsyncFunction = localCleanupAsyncFunction;
+ return this;
+ }
+
+ public Builder withGlobalCleanupAsyncFunction(
+ BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) {
+ this.globalCleanupAsyncFunction = globalCleanupAsyncFunction;
+ return this;
+ }
+
+ public TestingJobManagerRunnerRegistry build() {
+ return new TestingJobManagerRunnerRegistry(
+ isRegisteredFunction,
+ registerConsumer,
+ getFunction,
+ sizeSupplier,
+ getRunningJobIdsSupplier,
+ getJobManagerRunnersSupplier,
+ unregisterFunction,
+ localCleanupAsyncFunction,
+ globalCleanupAsyncFunction);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 0c0994d..7b6adf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -155,6 +155,10 @@ public class TestingJobManagerRunner implements JobManagerRunner {
terminationFuture.complete(null);
}
+ public void completeTerminationFutureExceptionally(Throwable expectedException) {
+ terminationFuture.completeExceptionally(expectedException);
+ }
+
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
@@ -166,7 +170,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
/** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */
public static class Builder {
- private JobID jobId = null;
+ private JobID jobId = new JobID();
private boolean blockingTermination = false;
private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
new CompletableFuture<>();