You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 09:34:41 UTC
[flink] branch master updated: [FLINK-11353][tests] Port
JobManagerHAJobGraphRecoveryITCase to new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2104335 [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
2104335 is described below
commit 21043356b29314977c2b638796327094b73e8a4a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 17 18:24:34 2019 +0100
[FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
Removed testClientNonDetachedListeningBehaviour because the test was only relevant for
the JobClientActor. Moved testJobPersistencyWhenJobManagerShutdown to
DispatcherHATest#testPersistedJobGraphWhenDispatcherIsShutDown.
This closes #7526.
---
.../flink/runtime/dispatcher/DispatcherHATest.java | 94 +++--
.../flink/runtime/dispatcher/DispatcherTest.java | 24 ++
.../runtime/dispatcher/TestingDispatcher.java | 6 +
.../JobManagerHAJobGraphRecoveryITCase.java | 447 ---------------------
.../testutils/InMemorySubmittedJobGraphStore.java | 1 -
5 files changed, 89 insertions(+), 483 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index f4e29db..2af7271 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.dispatcher;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
@@ -59,6 +58,7 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -125,7 +125,7 @@ public class DispatcherHATest extends TestLogger {
final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
- final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+ final HATestingDispatcher dispatcher = createDispatcher(highAvailabilityServices, fencingTokens);
dispatcher.start();
@@ -149,25 +149,6 @@ public class DispatcherHATest extends TestLogger {
}
}
- @Nonnull
- private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
- final Configuration configuration = new Configuration();
- return new HATestingDispatcher(
- rpcService,
- UUID.randomUUID().toString(),
- configuration,
- highAvailabilityServices,
- new TestingResourceManagerGateway(),
- new BlobServer(configuration, new VoidBlobStore()),
- new HeartbeatServices(1000L, 1000L),
- UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
- null,
- new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
- testingFatalErrorHandler,
- fencingTokens);
- }
-
/**
* Tests that all JobManagerRunner are terminated if the leadership of the
* Dispatcher is revoked.
@@ -182,7 +163,7 @@ public class DispatcherHATest extends TestLogger {
highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
- final HATestingDispatcher dispatcher = createHADispatcher(
+ final HATestingDispatcher dispatcher = createDispatcher(
highAvailabilityServices,
fencingTokens);
@@ -225,32 +206,75 @@ public class DispatcherHATest extends TestLogger {
return jobGraph;
}
+ @Nonnull
+ private HATestingDispatcher createDispatcher(
+ TestingHighAvailabilityServices highAvailabilityServices,
+ @Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+ final Configuration configuration = new Configuration();
+
+ return new HATestingDispatcher(
+ rpcService,
+ UUID.randomUUID().toString(),
+ configuration,
+ highAvailabilityServices,
+ new TestingResourceManagerGateway(),
+ new BlobServer(configuration, new VoidBlobStore()),
+ new HeartbeatServices(1000L, 1000L),
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+ null,
+ new MemoryArchivedExecutionGraphStore(),
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
+ testingFatalErrorHandler,
+ fencingTokens);
+ }
+
private static class HATestingDispatcher extends TestingDispatcher {
@Nonnull
- private final BlockingQueue<DispatcherId> fencingTokens;
-
- HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull Blo [...]
- super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
+ private final Queue<DispatcherId> fencingTokens;
+
+ HATestingDispatcher(
+ RpcService rpcService,
+ String endpointId,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ ResourceManagerGateway resourceManagerGateway,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ JobManagerMetricGroup jobManagerMetricGroup,
+ @Nullable String metricQueryServicePath,
+ ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ JobManagerRunnerFactory jobManagerRunnerFactory,
+ FatalErrorHandler fatalErrorHandler,
+ @Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+ super(
+ rpcService,
+ endpointId,
+ configuration,
+ highAvailabilityServices,
+ resourceManagerGateway,
+ blobServer,
+ heartbeatServices,
+ jobManagerMetricGroup,
+ metricQueryServicePath,
+ archivedExecutionGraphStore,
+ jobManagerRunnerFactory,
+ fatalErrorHandler);
this.fencingTokens = fencingTokens;
}
- @VisibleForTesting
- CompletableFuture<Integer> getNumberJobs(Time timeout) {
- return callAsyncWithoutFencing(
- () -> listJobs(timeout).get().size(),
- timeout);
- }
-
@Override
protected void setFencingToken(@Nullable DispatcherId newFencingToken) {
super.setFencingToken(newFencingToken);
+ final DispatcherId fencingToken;
if (newFencingToken == null) {
- fencingTokens.offer(NULL_FENCING_TOKEN);
+ fencingToken = NULL_FENCING_TOKEN;
} else {
- fencingTokens.offer(newFencingToken);
+ fencingToken = newFencingToken;
}
+
+ fencingTokens.offer(fencingToken);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b59be38..677ce5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -61,12 +61,14 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -661,6 +663,28 @@ public class DispatcherTest extends TestLogger {
submissionFuture.get();
}
+ @Test
+ public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
+ final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
+ haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+
+ // grant leadership and submit a single job
+ final DispatcherId expectedDispatcherId = DispatcherId.generate();
+ dispatcherLeaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
+
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ submissionFuture.get();
+ assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
+
+ dispatcher.shutDown();
+ dispatcher.getTerminationFuture().get();
+
+ assertThat(submittedJobGraphStore.contains(jobGraph.getJobID()), Matchers.is(true));
+ }
+
private final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {
@Nonnull
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 09632bb..050a34b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -87,4 +87,10 @@ class TestingDispatcher extends Dispatcher {
this::getRecoveryOperation,
timeout).thenCompose(Function.identity());
}
+
+ CompletableFuture<Integer> getNumberJobs(Time timeout) {
+ return callAsyncWithoutFencing(
+ () -> listJobs(timeout).get().size(),
+ timeout);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
deleted file mode 100644
index ae8542b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.leaderelection.TestingListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests recovery of {@link SubmittedJobGraph} instances.
- */
-public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
-
- private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
- private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @AfterClass
- public static void tearDown() throws Exception {
- ZooKeeper.shutdown();
- }
-
- @Before
- public void cleanUp() throws Exception {
- ZooKeeper.deleteAll();
- }
-
- // ---------------------------------------------------------------------------------------------
-
- /**
- * Tests that the HA job is not cleaned up when the jobmanager is stopped.
- */
- @Test
- public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
- Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
- // Configure the cluster
- config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
- TestingCluster flink = new TestingCluster(config, false, false);
-
- try {
- final Deadline deadline = TestTimeOut.fromNow();
-
- // Start the JobManager and TaskManager
- flink.start(true);
-
- JobGraph jobGraph = createBlockingJobGraph();
-
- // Set restart strategy to guard against shut down races.
- // If the TM fails before the JM, it might happen that the
- // Job is failed, leading to state removal.
- ExecutionConfig ec = new ExecutionConfig();
- ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
- jobGraph.setExecutionConfig(ec);
-
- ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-
- // Submit the job
- jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
- // Wait for the job to start
- JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
- jobManager, deadline.timeLeft());
- }
- finally {
- flink.stop();
- }
-
- // verify that the persisted job data has not been removed from ZooKeeper when the JM has
- // been shutdown
- verifyRecoveryState(config);
- }
-
- /**
- * Tests that clients receive updates after recovery by a new leader.
- */
- @Test
- public void testClientNonDetachedListeningBehaviour() throws Exception {
- Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
- // Test actor system
- ActorSystem testSystem = null;
-
- // JobManager setup. Start the job managers as separate processes in order to not run the
- // actors postStop, which cleans up all running jobs.
- JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-
- LeaderRetrievalService leaderRetrievalService = null;
-
- ActorSystem taskManagerSystem = null;
-
- final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- config,
- TestingUtils.defaultExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
- try {
- final Deadline deadline = TestTimeOut.fromNow();
-
- // Test actor system
- testSystem = AkkaUtils.createActorSystem(new Configuration(),
- new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
- // The job managers
- jobManagerProcess[0] = new JobManagerProcess(0, config);
- jobManagerProcess[1] = new JobManagerProcess(1, config);
-
- jobManagerProcess[0].startProcess();
- jobManagerProcess[1].startProcess();
-
- // Leader listener
- TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
- leaderRetrievalService.start(leaderListener);
-
- // The task manager
- taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
- TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- taskManagerSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.<String>empty(),
- false,
- TaskManager.class);
-
- // Client test actor
- TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
- testSystem, Props.create(RecordingTestClient.class));
-
- JobGraph jobGraph = createBlockingJobGraph();
-
- {
- // Initial submission
- leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
- String leaderAddress = leaderListener.getAddress();
- UUID leaderId = leaderListener.getLeaderSessionID();
-
- // The client
- AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
-
- // Get the leader ref
- ActorRef leaderRef = AkkaUtils.getActorRef(
- leaderAddress, testSystem, deadline.timeLeft());
- ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
- int numSlots = 0;
- while (numSlots == 0) {
- Future<?> slotsFuture = leader.ask(JobManagerMessages
- .getRequestTotalNumberOfSlots(), deadline.timeLeft());
-
- numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
- }
-
- // Submit the job in non-detached mode
- leader.tell(new SubmitJob(jobGraph,
- ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
-
- JobManagerActorTestUtils.waitForJobStatus(
- jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
- }
-
- // Who's the boss?
- JobManagerProcess leadingJobManagerProcess;
- if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
- leadingJobManagerProcess = jobManagerProcess[0];
- }
- else {
- leadingJobManagerProcess = jobManagerProcess[1];
- }
-
- // Kill the leading job manager process
- leadingJobManagerProcess.destroy();
-
- {
- // Recovery by the standby JobManager
- leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
- String leaderAddress = leaderListener.getAddress();
- UUID leaderId = leaderListener.getLeaderSessionID();
-
- ActorRef leaderRef = AkkaUtils.getActorRef(
- leaderAddress, testSystem, deadline.timeLeft());
- ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
- JobManagerActorTestUtils.waitForJobStatus(
- jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-
- // Cancel the job
- leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
- }
-
- // Wait for the execution result
- clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
-
- int jobSubmitSuccessMessages = 0;
- for (Object msg : clientRef.underlyingActor().getMessages()) {
- if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
- jobSubmitSuccessMessages++;
- }
- }
-
- // At least two submissions should be ack-ed (initial and recovery). This is quite
- // conservative, but it is still possible that these messages are overtaken by the
- // final message.
- assertEquals(2, jobSubmitSuccessMessages);
- }
- catch (Throwable t) {
- // Print early (in some situations the process logs get too big
- // for Travis and the root problem is not shown)
- t.printStackTrace();
-
- // In case of an error, print the job manager process logs.
- if (jobManagerProcess[0] != null) {
- jobManagerProcess[0].printProcessLog();
- }
-
- if (jobManagerProcess[1] != null) {
- jobManagerProcess[1].printProcessLog();
- }
-
- throw t;
- }
- finally {
- if (jobManagerProcess[0] != null) {
- jobManagerProcess[0].destroy();
- }
-
- if (jobManagerProcess[1] != null) {
- jobManagerProcess[1].destroy();
- }
-
- if (leaderRetrievalService != null) {
- leaderRetrievalService.stop();
- }
-
- if (taskManagerSystem != null) {
- taskManagerSystem.terminate();
- }
-
- if (testSystem != null) {
- testSystem.terminate();
- }
-
- highAvailabilityServices.closeAndCleanupAllData();
- }
- }
-
- /**
- * Simple recording client.
- */
- private static class RecordingTestClient extends UntypedActor {
-
- private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
-
- private CountDownLatch jobResultLatch = new CountDownLatch(1);
-
- @Override
- public void onReceive(Object message) throws Exception {
- if (message instanceof LeaderSessionMessage) {
- message = ((LeaderSessionMessage) message).message();
- }
-
- messages.add(message);
-
- // Check for job result
- if (message instanceof JobManagerMessages.JobResultFailure ||
- message instanceof JobManagerMessages.JobResultSuccess) {
-
- jobResultLatch.countDown();
- }
- }
-
- public Queue<Object> getMessages() {
- return messages;
- }
-
- public void awaitJobResult(long timeout) throws InterruptedException {
- jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
- }
- }
-
- // ---------------------------------------------------------------------------------------------
-
- /**
- * Creates a simple blocking JobGraph.
- */
- private static JobGraph createBlockingJobGraph() {
- JobGraph jobGraph = new JobGraph("Blocking program");
-
- JobVertex jobVertex = new JobVertex("Blocking Vertex");
- jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
-
- jobGraph.addVertex(jobVertex);
-
- return jobGraph;
- }
-
- /**
- * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
- */
- private void verifyCleanRecoveryState(Configuration config) throws Exception {
- // File state backend empty
- Collection<File> stateHandles = FileUtils.listFiles(
- tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
- if (!stateHandles.isEmpty()) {
- fail("File state backend is not clean: " + stateHandles);
- }
-
- // ZooKeeper
- String currentJobsPath = config.getString(
- HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
- Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
- if (stat.getCversion() == 0) {
- // Sanity check: verify that some changes have been performed
- fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
- "this test. What are you testing?");
- }
-
- if (stat.getNumChildren() != 0) {
- // Is everything clean again?
- fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
- ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
- }
- }
-
- /**
- * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
- */
- private void verifyRecoveryState(Configuration config) throws Exception {
- // File state backend empty
- Collection<File> stateHandles = FileUtils.listFiles(
- tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
- if (stateHandles.isEmpty()) {
- fail("File state backend has been cleaned: " + stateHandles);
- }
-
- // ZooKeeper
- String currentJobsPath = config.getString(
- HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
- Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
- if (stat.getCversion() == 0) {
- // Sanity check: verify that some changes have been performed
- fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
- "this test. What are you testing?");
- }
-
- if (stat.getNumChildren() == 0) {
- // Children have been cleaned up?
- fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
- ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
- }
- }
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index bf87515..169f95b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -113,7 +113,6 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
}
public synchronized boolean contains(JobID jobId) {
- verifyIsStarted();
return storedJobs.containsKey(jobId);
}