You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/06 16:55:42 UTC
[flink] 03/03: [FLINK-25953][runtime] Reorganizes dispatcher cleanup related tests
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 919dca5fd093f8a1bf28383dbaf75677bab595b8
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Feb 3 18:55:55 2022 +0100
[FLINK-25953][runtime] Reorganizes dispatcher cleanup related tests
This includes introducing a TestingDispatcher.Builder and aligning
the usages of the TestingDispatcher instantiation between tests.
Additionally, tests were renamed, obsolete tests were removed and
cleanup-related tests moved into DispatcherResourceCleanupTest.
---
.../DefaultJobManagerRunnerRegistry.java | 12 +-
.../flink/runtime/dispatcher/Dispatcher.java | 69 ++-
.../OnMainThreadJobManagerRunnerRegistry.java | 112 ++++
.../cleanup/DispatcherResourceCleanerFactory.java | 2 +-
.../runtime/dispatcher/AbstractDispatcherTest.java | 137 +----
.../DefaultJobManagerRunnerRegistryTest.java | 5 +-
.../dispatcher/DispatcherFailoverITCase.java | 10 +-
.../dispatcher/DispatcherResourceCleanupTest.java | 575 ++++++++++-----------
.../flink/runtime/dispatcher/DispatcherTest.java | 180 +------
.../runtime/dispatcher/TestingDispatcher.java | 281 ++++++++++
.../cleanup/TestingResourceCleanerFactory.java | 132 +++++
11 files changed, 876 insertions(+), 639 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
index 2ba54e9..2eb0b30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -37,19 +36,15 @@ import java.util.concurrent.Executor;
/**
* {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link
- * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from
- * within the main thread.
+ * JobManagerRunnerRegistry} interface.
*/
public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
@VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners;
- private final ComponentMainThreadExecutor mainThreadExecutor;
- public DefaultJobManagerRunnerRegistry(
- int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) {
+ public DefaultJobManagerRunnerRegistry(int initialCapacity) {
Preconditions.checkArgument(initialCapacity > 0);
jobManagerRunners = new HashMap<>(initialCapacity);
- this.mainThreadExecutor = mainThreadExecutor;
}
@Override
@@ -59,7 +54,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
@Override
public void register(JobManagerRunner jobManagerRunner) {
- mainThreadExecutor.assertRunningInMainThread();
Preconditions.checkArgument(
!isRegistered(jobManagerRunner.getJobID()),
"A job with the ID %s is already registered.",
@@ -99,7 +93,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
}
private CompletableFuture<Void> cleanup(JobID jobId) {
- mainThreadExecutor.assertRunningInMainThread();
if (isRegistered(jobId)) {
try {
unregister(jobId).close();
@@ -113,7 +106,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
@Override
public JobManagerRunner unregister(JobID jobId) {
- mainThreadExecutor.assertRunningInMainThread();
assertJobRegistered(jobId);
return this.jobManagerRunners.remove(jobId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index f15f294..5db8412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
@@ -97,7 +98,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -113,6 +113,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
public static final String DISPATCHER_NAME = "dispatcher";
+ private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
+
private final Configuration configuration;
private final JobGraphWriter jobGraphWriter;
@@ -126,7 +128,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private final FatalErrorHandler fatalErrorHandler;
- private final JobManagerRunnerRegistry jobManagerRunnerRegistry;
+ private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry;
private final Collection<JobGraph> recoveredJobs;
@@ -140,8 +142,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private final HistoryServerArchivist historyServerArchivist;
- private final Executor ioExecutor;
-
@Nullable private final String metricServiceQueryAddress;
private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures;
@@ -169,8 +169,48 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
+ this(
+ rpcService,
+ fencingToken,
+ recoveredJobs,
+ recoveredDirtyJobs,
+ dispatcherBootstrapFactory,
+ dispatcherServices,
+ new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY));
+ }
+
+ private Dispatcher(
+ RpcService rpcService,
+ DispatcherId fencingToken,
+ Collection<JobGraph> recoveredJobs,
+ Collection<JobResult> globallyTerminatedJobs,
+ DispatcherBootstrapFactory dispatcherBootstrapFactory,
+ DispatcherServices dispatcherServices,
+ JobManagerRunnerRegistry jobManagerRunnerRegistry)
+ throws Exception {
+ this(
+ rpcService,
+ fencingToken,
+ recoveredJobs,
+ globallyTerminatedJobs,
+ dispatcherBootstrapFactory,
+ dispatcherServices,
+ jobManagerRunnerRegistry,
+ new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices));
+ }
+
+ @VisibleForTesting
+ protected Dispatcher(
+ RpcService rpcService,
+ DispatcherId fencingToken,
+ Collection<JobGraph> recoveredJobs,
+ Collection<JobResult> recoveredDirtyJobs,
+ DispatcherBootstrapFactory dispatcherBootstrapFactory,
+ DispatcherServices dispatcherServices,
+ JobManagerRunnerRegistry jobManagerRunnerRegistry,
+ ResourceCleanerFactory resourceCleanerFactory)
+ throws Exception {
super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
- checkNotNull(dispatcherServices);
assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs);
this.configuration = dispatcherServices.getConfiguration();
@@ -184,14 +224,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.jobResultStore = dispatcherServices.getJobResultStore();
this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
- this.ioExecutor = dispatcherServices.getIoExecutor();
this.jobManagerSharedServices =
JobManagerSharedServices.fromConfiguration(
configuration, blobServer, fatalErrorHandler);
- jobManagerRunnerRegistry =
- new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor());
+ this.jobManagerRunnerRegistry =
+ new OnMainThreadJobManagerRunnerRegistry(
+ jobManagerRunnerRegistry, this.getMainThreadExecutor());
this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
@@ -199,7 +239,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
- this.jobManagerRunnerTerminationFutures = new HashMap<>(2);
+ this.jobManagerRunnerTerminationFutures =
+ new HashMap<>(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY);
this.shutDownFuture = new CompletableFuture<>();
@@ -209,7 +250,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.blobServer.retainJobs(
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()),
- ioExecutor);
+ dispatcherServices.getIoExecutor());
this.dispatcherCachedOperationsHandler =
new DispatcherCachedOperationsHandler(
@@ -217,8 +258,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this::triggerSavepointAndGetLocation,
this::stopWithSavepointAndGetLocation);
- final ResourceCleanerFactory resourceCleanerFactory =
- new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices);
this.localResourceCleaner =
resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
this.globalResourceCleaner =
@@ -1119,7 +1158,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
jobManagerMetricGroup.gauge(
- MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size());
+ MetricNames.NUM_RUNNING_JOBS,
+ // metrics can be called from anywhere and therefore, have to run without the main
+ // thread safeguard being triggered. For metrics, we can afford to be not 100%
+ // accurate
+ () -> (long) jobManagerRunnerRegistry.getWrappedDelegate().size());
}
public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..4c16b42
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.WrappingProxy;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code OnMainThreadJobManagerRunnerRegistry} implements {@link JobManagerRunnerRegistry} guarding
+ * the passed {@code JobManagerRunnerRegistry} instance in a way that it only allows modifying
+ * methods to be executed on the component's main thread.
+ *
+ * @see ComponentMainThreadExecutor
+ */
+public class OnMainThreadJobManagerRunnerRegistry
+ implements JobManagerRunnerRegistry, WrappingProxy<JobManagerRunnerRegistry> {
+
+ private final JobManagerRunnerRegistry delegate;
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
+ public OnMainThreadJobManagerRunnerRegistry(
+ JobManagerRunnerRegistry delegate, ComponentMainThreadExecutor mainThreadExecutor) {
+ this.delegate = delegate;
+ this.mainThreadExecutor = mainThreadExecutor;
+ }
+
+ @Override
+ public boolean isRegistered(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.isRegistered(jobId);
+ }
+
+ @Override
+ public void register(JobManagerRunner jobManagerRunner) {
+ mainThreadExecutor.assertRunningInMainThread();
+ delegate.register(jobManagerRunner);
+ }
+
+ @Override
+ public JobManagerRunner get(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.get(jobId);
+ }
+
+ @Override
+ public int size() {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.size();
+ }
+
+ @Override
+ public Set<JobID> getRunningJobIds() {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.getRunningJobIds();
+ }
+
+ @Override
+ public Collection<JobManagerRunner> getJobManagerRunners() {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.getJobManagerRunners();
+ }
+
+ @Override
+ public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.globalCleanupAsync(jobId, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.localCleanupAsync(jobId, executor);
+ }
+
+ @Override
+ public JobManagerRunner unregister(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ return delegate.unregister(jobId);
+ }
+
+ /**
+ * Returns the delegated {@link JobManagerRunnerRegistry}. This method can be used to workaround
+ * the main thread safeguard.
+ */
+ @Override
+ public JobManagerRunnerRegistry getWrappedDelegate() {
+ return this.delegate;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
index 3faa358..0957a42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -62,7 +62,7 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
}
@VisibleForTesting
- DispatcherResourceCleanerFactory(
+ public DispatcherResourceCleanerFactory(
Executor cleanupExecutor,
JobManagerRunnerRegistry jobManagerRunnerRegistry,
JobGraphWriter jobGraphWriter,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index f277e2b..75d638b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -26,18 +26,10 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
-import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -53,11 +45,6 @@ import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ForkJoinPool;
-
/** Abstract test for the {@link Dispatcher} component. */
public class AbstractDispatcherTest extends TestLogger {
@@ -116,6 +103,19 @@ public class AbstractDispatcherTest extends TestLogger {
new BlobServer(configuration, temporaryFolder.newFolder(), new VoidBlobStore());
}
+ protected TestingDispatcher.Builder createTestingDispatcherBuilder() {
+ return TestingDispatcher.builder()
+ .setRpcService(rpcService)
+ .setConfiguration(configuration)
+ .setHeartbeatServices(heartbeatServices)
+ .setHighAvailabilityServices(haServices)
+ .setJobGraphWriter(haServices.getJobGraphStore())
+ .setJobResultStore(haServices.getJobResultStore())
+ .setJobManagerRunnerFactory(JobMasterServiceLeadershipRunnerFactory.INSTANCE)
+ .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
+ .setBlobServer(blobServer);
+ }
+
@After
public void tearDown() throws Exception {
if (haServices != null) {
@@ -129,115 +129,4 @@ public class AbstractDispatcherTest extends TestLogger {
protected BlobServer getBlobServer() {
return blobServer;
}
-
- /** A convenient builder for the {@link TestingDispatcher}. */
- public class TestingDispatcherBuilder {
-
- private Collection<JobGraph> initialJobGraphs = Collections.emptyList();
-
- private Collection<JobResult> dirtyJobResults = Collections.emptyList();
-
- private DispatcherBootstrapFactory dispatcherBootstrapFactory =
- (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();
-
- private HeartbeatServices heartbeatServices = AbstractDispatcherTest.this.heartbeatServices;
-
- private HighAvailabilityServices haServices = AbstractDispatcherTest.this.haServices;
-
- private JobManagerRunnerFactory jobManagerRunnerFactory =
- JobMasterServiceLeadershipRunnerFactory.INSTANCE;
-
- private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
-
- private JobResultStore jobResultStore = new EmbeddedJobResultStore();
-
- private FatalErrorHandler fatalErrorHandler =
- testingFatalErrorHandlerResource.getFatalErrorHandler();
-
- private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
-
- TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
- this.heartbeatServices = heartbeatServices;
- return this;
- }
-
- TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) {
- this.haServices = haServices;
- return this;
- }
-
- TestingDispatcherBuilder setInitialJobGraphs(Collection<JobGraph> initialJobGraphs) {
- this.initialJobGraphs = initialJobGraphs;
- return this;
- }
-
- TestingDispatcherBuilder setDirtyJobResults(Collection<JobResult> dirtyJobResults) {
- this.dirtyJobResults = dirtyJobResults;
- return this;
- }
-
- TestingDispatcherBuilder setDispatcherBootstrapFactory(
- DispatcherBootstrapFactory dispatcherBootstrapFactory) {
- this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
- return this;
- }
-
- TestingDispatcherBuilder setJobManagerRunnerFactory(
- JobManagerRunnerFactory jobManagerRunnerFactory) {
- this.jobManagerRunnerFactory = jobManagerRunnerFactory;
- return this;
- }
-
- TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
- this.jobGraphWriter = jobGraphWriter;
- return this;
- }
-
- TestingDispatcherBuilder setJobResultStore(JobResultStore jobResultStore) {
- this.jobResultStore = jobResultStore;
- return this;
- }
-
- public TestingDispatcherBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
- this.fatalErrorHandler = fatalErrorHandler;
- return this;
- }
-
- public TestingDispatcherBuilder setHistoryServerArchivist(
- HistoryServerArchivist historyServerArchivist) {
- this.historyServerArchivist = historyServerArchivist;
- return this;
- }
-
- TestingDispatcher build() throws Exception {
- TestingResourceManagerGateway resourceManagerGateway =
- new TestingResourceManagerGateway();
-
- final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- new MemoryExecutionGraphInfoStore();
-
- return new TestingDispatcher(
- rpcService,
- DispatcherId.generate(),
- initialJobGraphs,
- dirtyJobResults,
- dispatcherBootstrapFactory,
- new DispatcherServices(
- configuration,
- haServices,
- () -> CompletableFuture.completedFuture(resourceManagerGateway),
- blobServer,
- heartbeatServices,
- executionGraphInfoStore,
- fatalErrorHandler,
- historyServerArchivist,
- null,
- new DispatcherOperationCaches(),
- UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
- jobGraphWriter,
- jobResultStore,
- jobManagerRunnerFactory,
- ForkJoinPool.commonPool()));
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index 6eca873..1d4cec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.util.FlinkException;
@@ -47,9 +46,7 @@ public class DefaultJobManagerRunnerRegistryTest {
@BeforeEach
public void setup() {
- testInstance =
- new DefaultJobManagerRunnerRegistry(
- 4, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ testInstance = new DefaultJobManagerRunnerRegistry(4);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
index fb9ac8a..6f3bcce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
@@ -241,13 +241,9 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest {
}
}
final TestingDispatcher dispatcher =
- new TestingDispatcherBuilder()
- .setJobManagerRunnerFactory(
- JobMasterServiceLeadershipRunnerFactory.INSTANCE)
- .setJobGraphWriter(haServices.getJobGraphStore())
- .setJobResultStore(haServices.getJobResultStore())
- .setInitialJobGraphs(jobGraphs)
- .setDirtyJobResults(haServices.getJobResultStore().getDirtyResults())
+ createTestingDispatcherBuilder()
+ .setRecoveredJobs(jobGraphs)
+ .setRecoveredDirtyJobs(haServices.getJobResultStore().getDirtyResults())
.setFatalErrorHandler(
fatalErrorHandler == null
? testingFatalErrorHandlerResource.getFatalErrorHandler()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index a4063b6..127f98c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,34 +22,27 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TestingBlobStore;
+import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
-import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -60,8 +53,10 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
@@ -74,28 +69,23 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
-import javax.annotation.Nullable;
-
-import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
-import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiFunction;
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests the resource cleanup by the {@link Dispatcher}. */
@@ -117,30 +107,14 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private JobGraph jobGraph;
- private Configuration configuration;
-
- private JobResultStore jobResultStore;
-
- private TestingHighAvailabilityServices highAvailabilityServices;
-
- private OneShotLatch clearedJobLatch;
-
private TestingDispatcher dispatcher;
private DispatcherGateway dispatcherGateway;
private BlobServer blobServer;
- private PermanentBlobKey permanentBlobKey;
-
- private File blobFile;
-
- private CompletableFuture<BlobKey> storedHABlobFuture;
- private CompletableFuture<JobID> deleteAllHABlobsFuture;
private CompletableFuture<JobID> localCleanupFuture;
private CompletableFuture<JobID> globalCleanupFuture;
- private CompletableFuture<JobID> cleanupJobHADataFuture;
- private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
@BeforeClass
public static void setupClass() {
@@ -152,41 +126,14 @@ public class DispatcherResourceCleanupTest extends TestLogger {
jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
jobId = jobGraph.getJobID();
- configuration = new Configuration();
-
- highAvailabilityServices = new TestingHighAvailabilityServices();
- clearedJobLatch = new OneShotLatch();
- jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch);
- highAvailabilityServices.setJobResultStore(jobResultStore);
- cleanupJobHADataFuture = new CompletableFuture<>();
- highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture);
-
- storedHABlobFuture = new CompletableFuture<>();
- deleteAllHABlobsFuture = new CompletableFuture<>();
-
- final TestingBlobStore testingBlobStore =
- new TestingBlobStoreBuilder()
- .setPutFunction(
- (file, jobId, blobKey) -> storedHABlobFuture.complete(blobKey))
- .setDeleteAllFunction(deleteAllHABlobsFuture::complete)
- .createTestingBlobStore();
-
globalCleanupFuture = new CompletableFuture<>();
localCleanupFuture = new CompletableFuture<>();
blobServer =
- new TestingBlobServer(
- configuration,
- temporaryFolder.newFolder(),
- testingBlobStore,
- (jobId, ignoredExecutor) -> {
- globalCleanupFuture.complete(jobId);
- return FutureUtils.completedVoidFuture();
- },
- (jobId, ignoredExecutor) -> {
- localCleanupFuture.complete(jobId);
- return FutureUtils.completedVoidFuture();
- });
+ BlobUtils.createBlobServer(
+ new Configuration(),
+ Reference.owned(temporaryFolder.newFolder()),
+ new TestingBlobStoreBuilder().createTestingBlobStore());
}
private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
@@ -195,54 +142,72 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
int numBlockingJobManagerRunners) throws Exception {
+ return startDispatcherAndSubmitJob(
+ createTestingDispatcherBuilder(), numBlockingJobManagerRunners);
+ }
+
+ private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
+ TestingDispatcher.Builder dispatcherBuilder, int numBlockingJobManagerRunners)
+ throws Exception {
final TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG =
new TestingJobManagerRunnerFactory(numBlockingJobManagerRunners);
- startDispatcher(testingJobManagerRunnerFactoryNG);
+ startDispatcher(dispatcherBuilder, testingJobManagerRunnerFactoryNG);
submitJobAndWait();
return testingJobManagerRunnerFactoryNG;
}
private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
- TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
- final MemoryExecutionGraphInfoStore archivedExecutionGraphStore =
- new MemoryExecutionGraphInfoStore();
- dispatcher =
- new TestingDispatcher(
- rpcService,
- DispatcherId.generate(),
- Collections.emptyList(),
- Collections.emptyList(),
- (dispatcher, scheduledExecutor, errorHandler) ->
- new NoOpDispatcherBootstrap(),
- new DispatcherServices(
- configuration,
- highAvailabilityServices,
- () -> CompletableFuture.completedFuture(resourceManagerGateway),
- blobServer,
- heartbeatServices,
- archivedExecutionGraphStore,
- testingFatalErrorHandlerResource.getFatalErrorHandler(),
- VoidHistoryServerArchivist.INSTANCE,
- null,
- new DispatcherOperationCaches(),
- UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
- jobGraphWriter,
- jobResultStore,
- jobManagerRunnerFactory,
- ForkJoinPool.commonPool()));
+ startDispatcher(createTestingDispatcherBuilder(), jobManagerRunnerFactory);
+ }
+
+ private void startDispatcher(
+ TestingDispatcher.Builder dispatcherBuilder,
+ JobManagerRunnerFactory jobManagerRunnerFactory)
+ throws Exception {
+ dispatcher = dispatcherBuilder.setJobManagerRunnerFactory(jobManagerRunnerFactory).build();
dispatcher.start();
dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
}
+ private TestingDispatcher.Builder createTestingDispatcherBuilder() {
+ final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+ new DefaultJobManagerRunnerRegistry(2);
+ return TestingDispatcher.builder()
+ .setRpcService(rpcService)
+ .setBlobServer(blobServer)
+ .setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
+ .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
+ .setResourceCleanerFactory(
+ TestingResourceCleanerFactory.builder()
+ // JobManagerRunnerRegistry needs to be added explicitly
+ // because cleaning it will trigger the closeAsync latch
+ // provided by TestingJobManagerRunner
+ .withLocallyCleanableResource(jobManagerRunnerRegistry)
+ .withGloballyCleanableResource(
+ (jobId, ignoredExecutor) -> {
+ globalCleanupFuture.complete(jobId);
+ return FutureUtils.completedVoidFuture();
+ })
+ .withLocallyCleanableResource(
+ (jobId, ignoredExecutor) -> {
+ localCleanupFuture.complete(jobId);
+ return FutureUtils.completedVoidFuture();
+ })
+ .build());
+ }
+
@After
public void teardown() throws Exception {
if (dispatcher != null) {
dispatcher.close();
}
+
+ if (blobServer != null) {
+ blobServer.close();
+ }
}
@AfterClass
@@ -253,41 +218,29 @@ public class DispatcherResourceCleanupTest extends TestLogger {
}
@Test
- public void testBlobServerCleanupWhenJobFinished() throws Exception {
+ public void testGlobalCleanupWhenJobFinished() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
// complete the job
finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
- assertThatHABlobsHaveBeenRemoved();
+ assertGlobalCleanupTriggered(jobId);
}
- private void assertThatHABlobsHaveBeenRemoved()
- throws InterruptedException, ExecutionException, TimeoutException {
- assertGlobalCleanupTriggered(jobId);
+ @Test
+ public void testGlobalCleanupWhenJobCanceled() throws Exception {
+ final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+ startDispatcherAndSubmitJob();
- // verify that we also cleared the BlobStore
- assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
+ // complete the job
+ cancelJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
- assertThat(blobFile.exists(), is(false));
+ assertGlobalCleanupTriggered(jobId);
}
private CompletableFuture<Acknowledge> submitJob() {
- try {
- // upload a blob to the blob server
- permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]);
- jobGraph.addUserJarBlobKey(permanentBlobKey);
- blobFile = blobServer.getStorageLocation(jobId, permanentBlobKey);
-
- assertThat(blobFile.exists(), is(true));
-
- // verify that we stored the blob also in the BlobStore
- assertThat(storedHABlobFuture.join(), equalTo(permanentBlobKey));
- return dispatcherGateway.submitJob(jobGraph, timeout);
- } catch (IOException ioe) {
- return FutureUtils.completedExceptionally(ioe);
- }
+ return dispatcherGateway.submitJob(jobGraph, timeout);
}
private void submitJobAndWait() {
@@ -295,7 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
}
@Test
- public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
+ public void testLocalCleanupWhenJobNotFinished() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
@@ -305,22 +258,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
suspendJob(testingJobManagerRunner);
assertLocalCleanupTriggered(jobId);
- assertThat(blobFile.exists(), is(false));
-
- // verify that we did not clear the BlobStore
- try {
- deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
- fail("We should not delete the HA blobs.");
- } catch (TimeoutException ignored) {
- // expected
- }
-
- assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
- /** Tests that the uploaded blobs are being cleaned up in case of a job submission failure. */
@Test
- public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
+ public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
final CompletableFuture<Acknowledge> submissionFuture = submitJob();
@@ -328,34 +269,23 @@ public class DispatcherResourceCleanupTest extends TestLogger {
submissionFuture.get();
fail("Job submission was expected to fail.");
} catch (ExecutionException ee) {
- assertThat(ee, FlinkMatchers.containsCause(JobSubmissionException.class));
+ assertThat(ee, containsCause(JobSubmissionException.class));
}
- assertThatHABlobsHaveBeenRemoved();
+ assertGlobalCleanupTriggered(jobId);
}
@Test
- public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
+ public void testLocalCleanupWhenClosingDispatcher() throws Exception {
startDispatcherAndSubmitJob();
dispatcher.closeAsync().get();
assertLocalCleanupTriggered(jobId);
- assertThat(blobFile.exists(), is(false));
-
- // verify that we did not clear the BlobStore
- try {
- deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
- fail("We should not delete the HA blobs.");
- } catch (TimeoutException ignored) {
- // expected
- }
-
- assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
@Test
- public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
+ public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
final TestingJobManagerRunner testingJobManagerRunner =
TestingJobManagerRunner.newBuilder()
.setBlockingTermination(true)
@@ -384,38 +314,89 @@ public class DispatcherResourceCleanupTest extends TestLogger {
dispatcherTerminationFuture.get();
assertGlobalCleanupTriggered(jobId);
- assertThat(deleteAllHABlobsFuture.get(), is(jobId));
}
- /**
- * Tests that the {@link JobResultStore} entries are marked as clean after the job reached a
- * terminal state.
- */
@Test
- public void testJobResultStoreCleanup() throws Exception {
+ public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception {
+ final OneShotLatch markAsDirtyLatch = new OneShotLatch();
+
+ final TestingDispatcher.Builder dispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setJobResultStore(
+ TestingJobResultStore.builder()
+ .withCreateDirtyResultConsumer(
+ ignoredJobResultEntry -> {
+ try {
+ markAsDirtyLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .build());
+
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
- startDispatcherAndSubmitJob();
+ startDispatcherAndSubmitJob(dispatcherBuilder, 0);
- final JobResult jobResult =
- TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN);
+ finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
- jobResultStore.createDirtyResult(new JobResultEntry(jobResult));
- assertTrue(jobResultStore.hasJobResultEntry(jobId));
+ assertThatNoCleanupWasTriggered();
- final TestingJobManagerRunner testingJobManagerRunner =
- jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .setJobID(jobId)
- .build()));
+ markAsDirtyLatch.trigger();
- // wait for the clearing
- clearedJobLatch.await();
+ assertGlobalCleanupTriggered(jobId);
+ }
- assertTrue(jobResultStore.hasJobResultEntry(jobId));
- assertTrue(jobResultStore.getDirtyResults().isEmpty());
+ @Test
+ public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception {
+ final CompletableFuture<JobID> markAsCleanFuture = new CompletableFuture<>();
+
+ final JobResultStore jobResultStore =
+ TestingJobResultStore.builder()
+ .withMarkResultAsCleanConsumer(markAsCleanFuture::complete)
+ .build();
+ final OneShotLatch localCleanupLatch = new OneShotLatch();
+ final OneShotLatch globalCleanupLatch = new OneShotLatch();
+ final TestingResourceCleanerFactory resourceCleanerFactory =
+ TestingResourceCleanerFactory.builder()
+ .withLocallyCleanableResource(
+ (ignoredJobId, ignoredExecutor) -> {
+ try {
+ localCleanupLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return FutureUtils.completedVoidFuture();
+ })
+ .withGloballyCleanableResource(
+ (ignoredJobId, ignoredExecutor) -> {
+ try {
+ globalCleanupLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+
+ final TestingDispatcher.Builder dispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setJobResultStore(jobResultStore)
+ .setResourceCleanerFactory(resourceCleanerFactory);
+
+ final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+ startDispatcherAndSubmitJob(dispatcherBuilder, 0);
+
+ finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
+
+ assertThat(markAsCleanFuture.isDone(), is(false));
+
+ localCleanupLatch.trigger();
+ assertThat(markAsCleanFuture.isDone(), is(false));
+ globalCleanupLatch.trigger();
+
+ assertThat(markAsCleanFuture.get(), is(jobId));
}
/**
@@ -481,32 +462,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
}
- assertThatHABlobsHaveBeenRemoved();
- }
-
- @Test
- public void testHaDataCleanupWhenJobFinished() throws Exception {
- TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
- TestingJobManagerRunner jobManagerRunner =
- jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- finishJob(jobManagerRunner);
- JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
- assertThat(jobID, is(this.jobId));
- }
-
- @Test
- public void testHaDataCleanupWhenJobNotFinished() throws Exception {
- TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
- TestingJobManagerRunner jobManagerRunner =
- jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- suspendJob(jobManagerRunner);
- try {
- cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
- fail("We should not delete the HA data for job.");
- } catch (TimeoutException ignored) {
- // expected
- }
- assertThat(cleanupJobHADataFuture.isDone(), is(false));
+ assertGlobalCleanupTriggered(jobId);
}
private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
@@ -517,6 +473,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED);
}
+ private void cancelJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
+ terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.CANCELED);
+ }
+
private void terminateJobWithState(
TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) {
takeCreatedJobManagerRunner.completeResultFuture(
@@ -530,8 +490,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private void assertThatNoCleanupWasTriggered() {
assertThat(globalCleanupFuture.isDone(), is(false));
assertThat(localCleanupFuture.isDone(), is(false));
- assertThat(deleteAllHABlobsFuture.isDone(), is(false));
- assertThat(blobFile.exists(), is(true));
}
@Test
@@ -566,84 +524,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
dispatcherTerminationFuture.get();
}
- private static final class SingleJobResultStore implements JobResultStore {
-
- private final JobID expectedJobId;
- @Nullable private JobResultEntry actualJobResultEntry;
- private boolean isDirty = true;
-
- private final OneShotLatch clearedJobLatch;
-
- private SingleJobResultStore(JobID expectedJobId, OneShotLatch clearedJobLatch) {
- this.expectedJobId = expectedJobId;
- this.clearedJobLatch = clearedJobLatch;
- }
-
- @Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException {
- checkJobId(jobResultEntry.getJobId());
- this.actualJobResultEntry = jobResultEntry;
- }
-
- private void checkJobId(JobID jobID) {
- Preconditions.checkArgument(expectedJobId.equals(jobID));
- }
-
- @Override
- public void markResultAsClean(JobID jobId) throws IOException {
- checkJobId(jobId);
- Preconditions.checkNotNull(actualJobResultEntry);
- isDirty = false;
- clearedJobLatch.trigger();
- }
-
- @Override
- public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
- if (actualJobResultEntry == null) {
- return false;
- }
-
- checkJobId(jobId);
- return isDirty;
- }
-
- @Override
- public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
- if (actualJobResultEntry == null) {
- return false;
- }
-
- checkJobId(jobId);
- return !isDirty;
- }
-
- @Override
- public Set<JobResult> getDirtyResults() throws IOException {
- return actualJobResultEntry != null && isDirty
- ? Collections.singleton(actualJobResultEntry.getJobResult())
- : Collections.emptySet();
- }
- }
-
- @Test
- public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
- final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
- startDispatcherAndSubmitJob();
-
- ArchivedExecutionGraph executionGraph =
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobId)
- .setState(JobStatus.CANCELED)
- .build();
-
- final TestingJobManagerRunner testingJobManagerRunner =
- jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
-
- assertGlobalCleanupTriggered(jobId);
- assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
- }
-
private void assertLocalCleanupTriggered(JobID jobId)
throws ExecutionException, InterruptedException, TimeoutException {
assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId));
@@ -658,17 +538,17 @@ public class DispatcherResourceCleanupTest extends TestLogger {
@Test
public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception {
- jobResultStore =
+ final JobResultStore jobResultStore =
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(
jobResult -> {
throw new IOException("Expected IOException.");
})
.build();
- highAvailabilityServices.setJobResultStore(jobResultStore);
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
- startDispatcherAndSubmitJob();
+ startDispatcherAndSubmitJob(
+ createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
ArchivedExecutionGraph executionGraph =
new ArchivedExecutionGraphBuilder()
@@ -691,7 +571,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
@Test
public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception {
final CompletableFuture<JobResultEntry> dirtyJobFuture = new CompletableFuture<>();
- jobResultStore =
+ final JobResultStore jobResultStore =
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(dirtyJobFuture::complete)
.withMarkResultAsCleanConsumer(
@@ -699,10 +579,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
throw new IOException("Expected IOException.");
})
.build();
- highAvailabilityServices.setJobResultStore(jobResultStore);
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
- startDispatcherAndSubmitJob();
+ startDispatcherAndSubmitJob(
+ createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
ArchivedExecutionGraph executionGraph =
new ArchivedExecutionGraphBuilder()
@@ -729,45 +609,108 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(dirtyJobFuture.get().getJobId(), is(jobId));
}
- private static final class TestingBlobServer extends BlobServer {
-
- private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction;
- private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction;
-
- /**
- * Instantiates a new BLOB server and binds it to a free network port.
- *
- * @param config Configuration to be used to instantiate the BlobServer
- * @param blobStore BlobStore to store blobs persistently
- * @param globalCleanupFunction The function called along the actual {@link
- * #globalCleanupAsync(JobID, Executor)} call.
- * @param localCleanupFunction The function called along the actual {@link
- * #localCleanupAsync(JobID, Executor)} call.
- * @throws IOException thrown if the BLOB server cannot bind to a free network port or if
- * the (local or distributed) file storage cannot be created or is not usable
- */
- public TestingBlobServer(
- Configuration config,
- File storageDirectory,
- BlobStore blobStore,
- BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction,
- BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction)
- throws IOException {
- super(config, storageDirectory, blobStore);
- this.globalCleanupFunction = globalCleanupFunction;
- this.localCleanupFunction = localCleanupFunction;
+ /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
+ @Test
+ public void testFailingJobManagerRunnerCleanup() throws Exception {
+ final FlinkException testException = new FlinkException("Test exception.");
+ final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
+
+ final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
+ new BlockingJobManagerRunnerFactory(
+ () -> {
+ final Optional<Exception> maybeException = queue.take();
+ if (maybeException.isPresent()) {
+ throw maybeException.get();
+ }
+ });
+
+ startDispatcher(blockingJobManagerRunnerFactory);
+
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // submit and fail during job master runner construction
+ queue.offer(Optional.of(testException));
+ try {
+ dispatcherGateway.submitJob(jobGraph, Time.minutes(1)).get();
+ fail("A FlinkException is expected");
+ } catch (Throwable expectedException) {
+ assertThat(expectedException, containsCause(FlinkException.class));
+ assertThat(expectedException, containsMessage(testException.getMessage()));
+ // make sure we've cleaned up in correct order (including HA)
+ assertGlobalCleanupTriggered(jobId);
}
- @Override
- public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
- return super.globalCleanupAsync(jobId, executor)
- .thenCompose(ignored -> globalCleanupFunction.apply(jobId, executor));
+ // don't fail this time
+ queue.offer(Optional.empty());
+ // submit job again
+ dispatcherGateway.submitJob(jobGraph, Time.minutes(1L)).get();
+ blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
+
+ // Ensure job is running
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+ }
+
+ private static final class BlockingJobManagerRunnerFactory
+ extends TestingJobManagerRunnerFactory {
+
+ private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
+ private TestingJobManagerRunner testingRunner;
+
+ BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+ this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
}
@Override
- public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
- return super.localCleanupAsync(jobId, executor)
- .thenCompose(ignored -> localCleanupFunction.apply(jobId, executor));
+ public TestingJobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerSharedServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler,
+ long initializationTimestamp)
+ throws Exception {
+ jobManagerRunnerCreationLatch.run();
+
+ this.testingRunner =
+ super.createJobManagerRunner(
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ jobManagerSharedServices,
+ jobManagerJobMetricGroupFactory,
+ fatalErrorHandler,
+ initializationTimestamp);
+
+ TestingJobMasterGateway testingJobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(
+ () ->
+ CompletableFuture.completedFuture(
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph
+ .createFromInitializingJob(
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ JobStatus.RUNNING,
+ null,
+ null,
+ 1337))))
+ .build();
+ testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+ return testingRunner;
+ }
+
+ public void setJobStatus(JobStatus newStatus) {
+ Preconditions.checkState(
+ testingRunner != null,
+ "JobManagerRunner must be created before this method is available");
+ this.testingRunner.setJobStatus(newStatus);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 6f4898a..bad61cd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProce
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
-import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -89,7 +88,6 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matchers;
@@ -110,11 +108,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -127,12 +123,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -178,8 +172,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
JobManagerRunnerFactory jobManagerRunnerFactory)
throws Exception {
final TestingDispatcher dispatcher =
- new TestingDispatcherBuilder()
- .setHaServices(haServices)
+ createTestingDispatcherBuilder()
+ .setHighAvailabilityServices(haServices)
.setHeartbeatServices(heartbeatServices)
.setJobManagerRunnerFactory(jobManagerRunnerFactory)
.setJobGraphWriter(haServices.getJobGraphStore())
@@ -246,11 +240,11 @@ public class DispatcherTest extends AbstractDispatcherTest {
@Test
public void testDuplicateJobSubmissionWithRunningJobId() throws Exception {
dispatcher =
- new TestingDispatcherBuilder()
+ createTestingDispatcherBuilder()
.setJobManagerRunnerFactory(
new ExpectedJobIdJobManagerRunnerFactory(
jobId, createdJobManagerRunnerLatch))
- .setInitialJobGraphs(Collections.singleton(jobGraph))
+ .setRecoveredJobs(Collections.singleton(jobGraph))
.build();
dispatcher.start();
final DispatcherGateway dispatcherGateway =
@@ -474,7 +468,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
new CompletableFuture<>();
dispatcher =
- new TestingDispatcherBuilder()
+ createTestingDispatcherBuilder()
.setJobManagerRunnerFactory(
new FinishingJobManagerRunnerFactory(
jobTerminationFuture, () -> {}))
@@ -675,10 +669,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
new TestingJobManagerRunnerFactory();
dispatcher =
- new TestingDispatcherBuilder()
+ createTestingDispatcherBuilder()
.setJobManagerRunnerFactory(jobManagerRunnerFactory)
- .setInitialJobGraphs(
- Collections.singleton(JobGraphTestUtils.emptyJobGraph()))
+ .setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph()))
.build();
dispatcher.start();
@@ -721,89 +714,12 @@ public class DispatcherTest extends AbstractDispatcherTest {
final JobResult jobResult =
TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID());
dispatcher =
- new TestingDispatcherBuilder()
- .setInitialJobGraphs(Collections.singleton(jobGraph))
- .setDirtyJobResults(Collections.singleton(jobResult))
+ createTestingDispatcherBuilder()
+ .setRecoveredJobs(Collections.singleton(jobGraph))
+ .setRecoveredDirtyJobs(Collections.singleton(jobResult))
.build();
}
- /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
- @Test
- public void testFailingJobManagerRunnerCleanup() throws Exception {
- final FlinkException testException = new FlinkException("Test exception.");
- final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
-
- final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
- new BlockingJobManagerRunnerFactory(
- () -> {
- final Optional<Exception> maybeException = queue.take();
- if (maybeException.isPresent()) {
- throw maybeException.get();
- }
- });
-
- final BlockingQueue<String> cleanUpEvents = new LinkedBlockingQueue<>();
-
- // Track cleanup - ha-services
- final CompletableFuture<JobID> cleanupJobData = new CompletableFuture<>();
- haServices.setGlobalCleanupFuture(cleanupJobData);
- cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES));
-
- // Track cleanup - job-graph
- final TestingJobGraphStore jobGraphStore =
- TestingJobGraphStore.newBuilder()
- .setLocalCleanupFunction(
- (jobId, executor) -> {
- cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE);
- return FutureUtils.completedVoidFuture();
- })
- .setGlobalCleanupFunction(
- (jobId, executor) -> {
- cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE);
- return FutureUtils.completedVoidFuture();
- })
- .build();
- jobGraphStore.start(null);
- haServices.setJobGraphStore(jobGraphStore);
-
- // Track cleanup - job result store
- haServices.setJobResultStore(
- TestingJobResultStore.builder()
- .withMarkResultAsCleanConsumer(
- jobID -> cleanUpEvents.add(CLEANUP_JOB_RESULT_STORE))
- .build());
-
- dispatcher =
- createAndStartDispatcher(
- heartbeatServices, haServices, blockingJobManagerRunnerFactory);
-
- final DispatcherGateway dispatcherGateway =
- dispatcher.getSelfGateway(DispatcherGateway.class);
-
- // submit and fail during job master runner construction
- queue.offer(Optional.of(testException));
- try {
- dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- fail("A FlinkException is expected");
- } catch (Throwable expectedException) {
- assertThat(expectedException, containsCause(FlinkException.class));
- assertThat(expectedException, containsMessage(testException.getMessage()));
- // make sure we've cleaned up in correct order (including HA)
- assertThat(
- new ArrayList<>(cleanUpEvents),
- containsInAnyOrder(CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_HA_SERVICES));
- }
-
- // don't fail this time
- queue.offer(Optional.empty());
- // submit job again
- dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
-
- // Ensure job is running
- awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
- }
-
@Test
public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
final TestingJobGraphStore submittedJobGraphStore =
@@ -812,7 +728,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
haServices.setJobGraphStore(submittedJobGraphStore);
dispatcher =
- new TestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build();
+ createTestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build();
dispatcher.start();
@@ -930,8 +846,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
testingJobGraphStore.start(null);
dispatcher =
- new TestingDispatcherBuilder()
- .setInitialJobGraphs(Collections.singleton(jobGraph))
+ createTestingDispatcherBuilder()
+ .setRecoveredJobs(Collections.singleton(jobGraph))
.setJobGraphWriter(testingJobGraphStore)
.build();
dispatcher.start();
@@ -1110,8 +1026,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
final PermanentBlobKey blobKey2 = blobServer.putPermanent(jobId2, fileContent);
dispatcher =
- new TestingDispatcherBuilder()
- .setInitialJobGraphs(Collections.singleton(new JobGraph(jobId1, "foobar")))
+ createTestingDispatcherBuilder()
+ .setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar")))
.build();
Assertions.assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
@@ -1365,70 +1281,6 @@ public class DispatcherTest extends AbstractDispatcherTest {
}
}
- private static final class BlockingJobManagerRunnerFactory
- extends TestingJobManagerRunnerFactory {
-
- @Nonnull private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
- private TestingJobManagerRunner testingRunner;
-
- BlockingJobManagerRunnerFactory(
- @Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
- this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
- }
-
- @Override
- public TestingJobManagerRunner createJobManagerRunner(
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- JobManagerSharedServices jobManagerSharedServices,
- JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler,
- long initializationTimestamp)
- throws Exception {
- jobManagerRunnerCreationLatch.run();
-
- this.testingRunner =
- super.createJobManagerRunner(
- jobGraph,
- configuration,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- jobManagerSharedServices,
- jobManagerJobMetricGroupFactory,
- fatalErrorHandler,
- initializationTimestamp);
-
- TestingJobMasterGateway testingJobMasterGateway =
- new TestingJobMasterGatewayBuilder()
- .setRequestJobSupplier(
- () ->
- CompletableFuture.completedFuture(
- new ExecutionGraphInfo(
- ArchivedExecutionGraph
- .createFromInitializingJob(
- jobGraph.getJobID(),
- jobGraph.getName(),
- JobStatus.RUNNING,
- null,
- null,
- 1337))))
- .build();
- testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
- return testingRunner;
- }
-
- public void setJobStatus(JobStatus newStatus) {
- Preconditions.checkState(
- testingRunner != null,
- "JobManagerRunner must be created before this method is available");
- this.testingRunner.setJobStatus(newStatus);
- }
- }
-
private static final class InitializationTimestampCapturingJobManagerRunnerFactory
implements JobManagerRunnerFactory {
private final BlockingQueue<Long> initializationTimestampQueue;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index ba5ecca..5c3efc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -20,16 +20,39 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
/** {@link Dispatcher} implementation used for testing purposes. */
@@ -56,6 +79,58 @@ class TestingDispatcher extends Dispatcher {
this.startFuture = new CompletableFuture<>();
}
+ private TestingDispatcher(
+ RpcService rpcService,
+ DispatcherId fencingToken,
+ Collection<JobGraph> recoveredJobs,
+ Collection<JobResult> recoveredDirtyJobs,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+ HeartbeatServices heartbeatServices,
+ BlobServer blobServer,
+ FatalErrorHandler fatalErrorHandler,
+ JobGraphWriter jobGraphWriter,
+ JobResultStore jobResultStore,
+ JobManagerMetricGroup jobManagerMetricGroup,
+ @Nullable String metricServiceQueryAddress,
+ Executor ioExecutor,
+ HistoryServerArchivist historyServerArchivist,
+ ExecutionGraphInfoStore executionGraphInfoStore,
+ JobManagerRunnerFactory jobManagerRunnerFactory,
+ DispatcherBootstrapFactory dispatcherBootstrapFactory,
+ DispatcherOperationCaches dispatcherOperationCaches,
+ JobManagerRunnerRegistry jobManagerRunnerRegistry,
+ ResourceCleanerFactory resourceCleanerFactory)
+ throws Exception {
+ super(
+ rpcService,
+ fencingToken,
+ recoveredJobs,
+ recoveredDirtyJobs,
+ dispatcherBootstrapFactory,
+ new DispatcherServices(
+ configuration,
+ highAvailabilityServices,
+ resourceManagerGatewayRetriever,
+ blobServer,
+ heartbeatServices,
+ executionGraphInfoStore,
+ fatalErrorHandler,
+ historyServerArchivist,
+ metricServiceQueryAddress,
+ dispatcherOperationCaches,
+ jobManagerMetricGroup,
+ jobGraphWriter,
+ jobResultStore,
+ jobManagerRunnerFactory,
+ ioExecutor),
+ jobManagerRunnerRegistry,
+ resourceCleanerFactory);
+
+ this.startFuture = new CompletableFuture<>();
+ }
+
@Override
public void onStart() throws Exception {
try {
@@ -91,4 +166,210 @@ class TestingDispatcher extends Dispatcher {
void waitUntilStarted() {
startFuture.join();
}
+
+ public static TestingDispatcher.Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private RpcService rpcService = new TestingRpcService();
+ private DispatcherId fencingToken = DispatcherId.generate();
+ private Collection<JobGraph> recoveredJobs = Collections.emptyList();
+ private Collection<JobResult> recoveredDirtyJobs = Collections.emptyList();
+ private HighAvailabilityServices highAvailabilityServices =
+ new TestingHighAvailabilityServices();
+
+ private TestingResourceManagerGateway resourceManagerGateway =
+ new TestingResourceManagerGateway();
+ private GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
+ () -> CompletableFuture.completedFuture(resourceManagerGateway);
+ private HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+
+ private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+ private JobResultStore jobResultStore = new EmbeddedJobResultStore();
+
+ private Configuration configuration = new Configuration();
+
+ // even-though it's labeled as @Nullable, it's a mandatory field that needs to be set before
+ // building the Dispatcher instance
+ @Nullable private BlobServer blobServer = null;
+ private FatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+ private JobManagerMetricGroup jobManagerMetricGroup =
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
+ @Nullable private String metricServiceQueryAddress = null;
+ private Executor ioExecutor = ForkJoinPool.commonPool();
+ private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
+ private ExecutionGraphInfoStore executionGraphInfoStore =
+ new MemoryExecutionGraphInfoStore();
+ private JobManagerRunnerFactory jobManagerRunnerFactory =
+ new TestingJobManagerRunnerFactory(0);
+ private DispatcherBootstrapFactory dispatcherBootstrapFactory =
+ (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();
+ private DispatcherOperationCaches dispatcherOperationCaches =
+ new DispatcherOperationCaches();
+ private JobManagerRunnerRegistry jobManagerRunnerRegistry =
+ new DefaultJobManagerRunnerRegistry(1);
+ @Nullable private ResourceCleanerFactory resourceCleanerFactory;
+
+ public Builder setRpcService(RpcService rpcService) {
+ this.rpcService = rpcService;
+ return this;
+ }
+
+ public Builder setFencingToken(DispatcherId fencingToken) {
+ this.fencingToken = fencingToken;
+ return this;
+ }
+
+ public Builder setRecoveredJobs(Collection<JobGraph> recoveredJobs) {
+ this.recoveredJobs = recoveredJobs;
+ return this;
+ }
+
+ public Builder setRecoveredDirtyJobs(Collection<JobResult> recoveredDirtyJobs) {
+ this.recoveredDirtyJobs = recoveredDirtyJobs;
+ return this;
+ }
+
+ public Builder setHighAvailabilityServices(
+ HighAvailabilityServices highAvailabilityServices) {
+ this.highAvailabilityServices = highAvailabilityServices;
+ return this;
+ }
+
+ public Builder setResourceManagerGateway(
+ TestingResourceManagerGateway resourceManagerGateway) {
+ this.resourceManagerGateway = resourceManagerGateway;
+ return this;
+ }
+
+ public Builder setResourceManagerGatewayRetriever(
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+ this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
+ return this;
+ }
+
+ public Builder setHeartbeatServices(HeartbeatServices heartbeatServices) {
+ this.heartbeatServices = heartbeatServices;
+ return this;
+ }
+
+ public Builder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
+ this.jobGraphWriter = jobGraphWriter;
+ return this;
+ }
+
+ public Builder setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ return this;
+ }
+
+ public Builder setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder setBlobServer(BlobServer blobServer) {
+ this.blobServer = blobServer;
+ return this;
+ }
+
+ public Builder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
+ this.fatalErrorHandler = fatalErrorHandler;
+ return this;
+ }
+
+ public Builder setJobManagerMetricGroup(JobManagerMetricGroup jobManagerMetricGroup) {
+ this.jobManagerMetricGroup = jobManagerMetricGroup;
+ return this;
+ }
+
+ public Builder setMetricServiceQueryAddress(@Nullable String metricServiceQueryAddress) {
+ this.metricServiceQueryAddress = metricServiceQueryAddress;
+ return this;
+ }
+
+ public Builder setIoExecutor(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public Builder setHistoryServerArchivist(HistoryServerArchivist historyServerArchivist) {
+ this.historyServerArchivist = historyServerArchivist;
+ return this;
+ }
+
+ public Builder setExecutionGraphInfoStore(ExecutionGraphInfoStore executionGraphInfoStore) {
+ this.executionGraphInfoStore = executionGraphInfoStore;
+ return this;
+ }
+
+ public Builder setJobManagerRunnerFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
+ this.jobManagerRunnerFactory = jobManagerRunnerFactory;
+ return this;
+ }
+
+ public Builder setDispatcherBootstrapFactory(
+ DispatcherBootstrapFactory dispatcherBootstrapFactory) {
+ this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
+ return this;
+ }
+
+ public Builder setDispatcherOperationCaches(
+ DispatcherOperationCaches dispatcherOperationCaches) {
+ this.dispatcherOperationCaches = dispatcherOperationCaches;
+ return this;
+ }
+
+ public Builder setJobManagerRunnerRegistry(
+ JobManagerRunnerRegistry jobManagerRunnerRegistry) {
+ this.jobManagerRunnerRegistry = jobManagerRunnerRegistry;
+ return this;
+ }
+
+ public Builder setResourceCleanerFactory(ResourceCleanerFactory resourceCleanerFactory) {
+ this.resourceCleanerFactory = resourceCleanerFactory;
+ return this;
+ }
+
+ private ResourceCleanerFactory createDefaultResourceCleanerFactory() {
+ return new DispatcherResourceCleanerFactory(
+ ioExecutor,
+ jobManagerRunnerRegistry,
+ jobGraphWriter,
+ blobServer,
+ highAvailabilityServices,
+ jobManagerMetricGroup);
+ }
+
+ public TestingDispatcher build() throws Exception {
+ return new TestingDispatcher(
+ rpcService,
+ fencingToken,
+ recoveredJobs,
+ recoveredDirtyJobs,
+ configuration,
+ highAvailabilityServices,
+ resourceManagerGatewayRetriever,
+ heartbeatServices,
+ Preconditions.checkNotNull(
+ blobServer,
+ "No BlobServer is specified for building the TestingDispatcher"),
+ fatalErrorHandler,
+ jobGraphWriter,
+ jobResultStore,
+ jobManagerMetricGroup,
+ metricServiceQueryAddress,
+ ioExecutor,
+ historyServerArchivist,
+ executionGraphInfoStore,
+ jobManagerRunnerFactory,
+ dispatcherBootstrapFactory,
+ dispatcherOperationCaches,
+ jobManagerRunnerRegistry,
+ resourceCleanerFactory != null
+ ? resourceCleanerFactory
+ : createDefaultResourceCleanerFactory());
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java
new file mode 100644
index 0000000..2d54b8b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Executor;
+
+/** {@code TestingResourceCleanerFactory} for adding custom {@link ResourceCleaner} creation. */
+public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
+
+ private final Collection<LocallyCleanableResource> locallyCleanableResources;
+ private final Collection<GloballyCleanableResource> globallyCleanableResources;
+
+ private final Executor cleanupExecutor;
+
+ private TestingResourceCleanerFactory(
+ Collection<LocallyCleanableResource> locallyCleanableResources,
+ Collection<GloballyCleanableResource> globallyCleanableResources,
+ Executor cleanupExecutor) {
+ this.locallyCleanableResources = locallyCleanableResources;
+ this.globallyCleanableResources = globallyCleanableResources;
+ this.cleanupExecutor = cleanupExecutor;
+ }
+
+ @Override
+ public ResourceCleaner createLocalResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor) {
+ return createResourceCleaner(
+ mainThreadExecutor,
+ locallyCleanableResources,
+ LocallyCleanableResource::localCleanupAsync);
+ }
+
+ @Override
+ public ResourceCleaner createGlobalResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor) {
+ return createResourceCleaner(
+ mainThreadExecutor,
+ globallyCleanableResources,
+ GloballyCleanableResource::globalCleanupAsync);
+ }
+
+ private <T> ResourceCleaner createResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Collection<T> resources,
+ DefaultResourceCleaner.CleanupFn<T> cleanupFn) {
+ return jobId -> {
+ mainThreadExecutor.assertRunningInMainThread();
+ Throwable t = null;
+ for (T resource : resources) {
+ try {
+ cleanupFn.cleanupAsync(resource, jobId, cleanupExecutor).get();
+ } catch (Throwable throwable) {
+ t = ExceptionUtils.firstOrSuppressed(throwable, t);
+ }
+ }
+ return t != null
+ ? FutureUtils.completedExceptionally(t)
+ : FutureUtils.completedVoidFuture();
+ };
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** {@code Builder} for creating {@code TestingResourceCleanerFactory} instances. */
+ public static class Builder {
+
+ private Collection<LocallyCleanableResource> locallyCleanableResources = new ArrayList<>();
+ private Collection<GloballyCleanableResource> globallyCleanableResources =
+ new ArrayList<>();
+
+ private Executor cleanupExecutor = Executors.directExecutor();
+
+ public Builder setLocallyCleanableResources(
+ Collection<LocallyCleanableResource> locallyCleanableResources) {
+ this.locallyCleanableResources = locallyCleanableResources;
+ return this;
+ }
+
+ public Builder withLocallyCleanableResource(
+ LocallyCleanableResource locallyCleanableResource) {
+ this.locallyCleanableResources.add(locallyCleanableResource);
+ return this;
+ }
+
+ public Builder setGloballyCleanableResources(
+ Collection<GloballyCleanableResource> globallyCleanableResources) {
+ this.globallyCleanableResources = globallyCleanableResources;
+ return this;
+ }
+
+ public Builder withGloballyCleanableResource(
+ GloballyCleanableResource globallyCleanableResource) {
+ this.globallyCleanableResources.add(globallyCleanableResource);
+ return this;
+ }
+
+ public Builder setCleanupExecutor(Executor cleanupExecutor) {
+ this.cleanupExecutor = cleanupExecutor;
+ return this;
+ }
+
+ public TestingResourceCleanerFactory build() {
+ return new TestingResourceCleanerFactory(
+ locallyCleanableResources, globallyCleanableResources, cleanupExecutor);
+ }
+ }
+}