You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 09:34:41 UTC

[flink] branch master updated: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2104335  [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
2104335 is described below

commit 21043356b29314977c2b638796327094b73e8a4a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 17 18:24:34 2019 +0100

    [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
    
    Removed testClientNonDetachedListeningBehaviour because the test was only relevant for
    the JobClientActor. Moved testJobPersistencyWhenJobManagerShutdown to
    DispatcherHATest#testPersistedJobGraphWhenDispatcherIsShutDown.
    
    This closes #7526.
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  94 +++--
 .../flink/runtime/dispatcher/DispatcherTest.java   |  24 ++
 .../runtime/dispatcher/TestingDispatcher.java      |   6 +
 .../JobManagerHAJobGraphRecoveryITCase.java        | 447 ---------------------
 .../testutils/InMemorySubmittedJobGraphStore.java  |   1 -
 5 files changed, 89 insertions(+), 483 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index f4e29db..2af7271 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -59,6 +58,7 @@ import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -125,7 +125,7 @@ public class DispatcherHATest extends TestLogger {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+		final HATestingDispatcher dispatcher = createDispatcher(highAvailabilityServices, fencingTokens);
 
 		dispatcher.start();
 
@@ -149,25 +149,6 @@ public class DispatcherHATest extends TestLogger {
 		}
 	}
 
-	@Nonnull
-	private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
-		final Configuration configuration = new Configuration();
-		return new HATestingDispatcher(
-			rpcService,
-			UUID.randomUUID().toString(),
-			configuration,
-			highAvailabilityServices,
-			new TestingResourceManagerGateway(),
-			new BlobServer(configuration, new VoidBlobStore()),
-			new HeartbeatServices(1000L, 1000L),
-			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-			null,
-			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
-			testingFatalErrorHandler,
-			fencingTokens);
-	}
-
 	/**
 	 * Tests that all JobManagerRunner are terminated if the leadership of the
 	 * Dispatcher is revoked.
@@ -182,7 +163,7 @@ public class DispatcherHATest extends TestLogger {
 		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
 
 		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
-		final HATestingDispatcher dispatcher = createHADispatcher(
+		final HATestingDispatcher dispatcher = createDispatcher(
 			highAvailabilityServices,
 			fencingTokens);
 
@@ -225,32 +206,75 @@ public class DispatcherHATest extends TestLogger {
 		return jobGraph;
 	}
 
+	@Nonnull
+	private HATestingDispatcher createDispatcher(
+			TestingHighAvailabilityServices highAvailabilityServices,
+			@Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+		final Configuration configuration = new Configuration();
+
+		return new HATestingDispatcher(
+			rpcService,
+			UUID.randomUUID().toString(),
+			configuration,
+			highAvailabilityServices,
+			new TestingResourceManagerGateway(),
+			new BlobServer(configuration, new VoidBlobStore()),
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			null,
+			new MemoryArchivedExecutionGraphStore(),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
+			testingFatalErrorHandler,
+			fencingTokens);
+	}
+
 	private static class HATestingDispatcher extends TestingDispatcher {
 
 		@Nonnull
-		private final BlockingQueue<DispatcherId> fencingTokens;
-
-		HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull Blo [...]
-			super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
+		private final Queue<DispatcherId> fencingTokens;
+
+		HATestingDispatcher(
+				RpcService rpcService,
+				String endpointId,
+				Configuration configuration,
+				HighAvailabilityServices highAvailabilityServices,
+				ResourceManagerGateway resourceManagerGateway,
+				BlobServer blobServer,
+				HeartbeatServices heartbeatServices,
+				JobManagerMetricGroup jobManagerMetricGroup,
+				@Nullable String metricQueryServicePath,
+				ArchivedExecutionGraphStore archivedExecutionGraphStore,
+				JobManagerRunnerFactory jobManagerRunnerFactory,
+				FatalErrorHandler fatalErrorHandler,
+				@Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+			super(
+				rpcService,
+				endpointId,
+				configuration,
+				highAvailabilityServices,
+				resourceManagerGateway,
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricQueryServicePath,
+				archivedExecutionGraphStore,
+				jobManagerRunnerFactory,
+				fatalErrorHandler);
 			this.fencingTokens = fencingTokens;
 		}
 
-		@VisibleForTesting
-		CompletableFuture<Integer> getNumberJobs(Time timeout) {
-			return callAsyncWithoutFencing(
-				() -> listJobs(timeout).get().size(),
-				timeout);
-		}
-
 		@Override
 		protected void setFencingToken(@Nullable DispatcherId newFencingToken) {
 			super.setFencingToken(newFencingToken);
 
+			final DispatcherId fencingToken;
 			if (newFencingToken == null) {
-				fencingTokens.offer(NULL_FENCING_TOKEN);
+				fencingToken = NULL_FENCING_TOKEN;
 			} else {
-				fencingTokens.offer(newFencingToken);
+				fencingToken = newFencingToken;
 			}
+
+			fencingTokens.offer(fencingToken);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b59be38..677ce5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -61,12 +61,14 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -661,6 +663,28 @@ public class DispatcherTest extends TestLogger {
 		submissionFuture.get();
 	}
 
+	@Test
+	public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
+		final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
+		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+
+		// grant leadership and submit a single job
+		final DispatcherId expectedDispatcherId = DispatcherId.generate();
+		dispatcherLeaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+		submissionFuture.get();
+		assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
+
+		dispatcher.shutDown();
+		dispatcher.getTerminationFuture().get();
+
+		assertThat(submittedJobGraphStore.contains(jobGraph.getJobID()), Matchers.is(true));
+	}
+
 	private final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {
 
 		@Nonnull
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 09632bb..050a34b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -87,4 +87,10 @@ class TestingDispatcher extends Dispatcher {
 			this::getRecoveryOperation,
 			timeout).thenCompose(Function.identity());
 	}
+
+	CompletableFuture<Integer> getNumberJobs(Time timeout) {
+		return callAsyncWithoutFencing(
+			() -> listJobs(timeout).get().size(),
+			timeout);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
deleted file mode 100644
index ae8542b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.leaderelection.TestingListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
-import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests recovery of {@link SubmittedJobGraph} instances.
- */
-public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
-
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
-	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		ZooKeeper.shutdown();
-	}
-
-	@Before
-	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
-	 */
-	@Test
-	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Configure the cluster
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-		TestingCluster flink = new TestingCluster(config, false, false);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Start the JobManager and TaskManager
-			flink.start(true);
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			// Set restart strategy to guard against shut down races.
-			// If the TM fails before the JM, it might happen that the
-			// Job is failed, leading to state removal.
-			ExecutionConfig ec = new ExecutionConfig();
-			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
-			jobGraph.setExecutionConfig(ec);
-
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-
-			// Submit the job
-			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-			// Wait for the job to start
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					jobManager, deadline.timeLeft());
-		}
-		finally {
-			flink.stop();
-		}
-
-		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
-		// been shutdown
-		verifyRecoveryState(config);
-	}
-
-	/**
-	 * Tests that clients receive updates after recovery by a new leader.
-	 */
-	@Test
-	public void testClientNonDetachedListeningBehaviour() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
-
-		// Test actor system
-		ActorSystem testSystem = null;
-
-		// JobManager setup. Start the job managers as separate processes in order to not run the
-		// actors postStop, which cleans up all running jobs.
-		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-
-		LeaderRetrievalService leaderRetrievalService = null;
-
-		ActorSystem taskManagerSystem = null;
-
-		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Test actor system
-			testSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
-			// The job managers
-			jobManagerProcess[0] = new JobManagerProcess(0, config);
-			jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-			jobManagerProcess[0].startProcess();
-			jobManagerProcess[1].startProcess();
-
-			// Leader listener
-			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-			leaderRetrievalService.start(leaderListener);
-
-			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-			TaskManager.startTaskManagerComponentsAndActor(
-				config,
-				ResourceID.generate(),
-				taskManagerSystem,
-				highAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				"localhost",
-				Option.<String>empty(),
-				false,
-				TaskManager.class);
-
-			// Client test actor
-			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
-					testSystem, Props.create(RecordingTestClient.class));
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			{
-				// Initial submission
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				// The client
-				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
-
-				// Get the leader ref
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				int numSlots = 0;
-				while (numSlots == 0) {
-					Future<?> slotsFuture = leader.ask(JobManagerMessages
-							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
-
-					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
-				}
-
-				// Submit the job in non-detached mode
-				leader.tell(new SubmitJob(jobGraph,
-						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-			}
-
-			// Who's the boss?
-			JobManagerProcess leadingJobManagerProcess;
-			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
-				leadingJobManagerProcess = jobManagerProcess[0];
-			}
-			else {
-				leadingJobManagerProcess = jobManagerProcess[1];
-			}
-
-			// Kill the leading job manager process
-			leadingJobManagerProcess.destroy();
-
-			{
-				// Recovery by the standby JobManager
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-
-				// Cancel the job
-				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
-			}
-
-			// Wait for the execution result
-			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
-
-			int jobSubmitSuccessMessages = 0;
-			for (Object msg : clientRef.underlyingActor().getMessages()) {
-				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
-					jobSubmitSuccessMessages++;
-				}
-			}
-
-			// At least two submissions should be ack-ed (initial and recovery). This is quite
-			// conservative, but it is still possible that these messages are overtaken by the
-			// final message.
-			assertEquals(2, jobSubmitSuccessMessages);
-		}
-		catch (Throwable t) {
-			// Print early (in some situations the process logs get too big
-			// for Travis and the root problem is not shown)
-			t.printStackTrace();
-
-			// In case of an error, print the job manager process logs.
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].printProcessLog();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].printProcessLog();
-			}
-
-			throw t;
-		}
-		finally {
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].destroy();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].destroy();
-			}
-
-			if (leaderRetrievalService != null) {
-				leaderRetrievalService.stop();
-			}
-
-			if (taskManagerSystem != null) {
-				taskManagerSystem.terminate();
-			}
-
-			if (testSystem != null) {
-				testSystem.terminate();
-			}
-
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
-
-	/**
-	 * Simple recording client.
-	 */
-	private static class RecordingTestClient extends UntypedActor {
-
-		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
-
-		private CountDownLatch jobResultLatch = new CountDownLatch(1);
-
-		@Override
-		public void onReceive(Object message) throws Exception {
-			if (message instanceof LeaderSessionMessage) {
-				message = ((LeaderSessionMessage) message).message();
-			}
-
-			messages.add(message);
-
-			// Check for job result
-			if (message instanceof JobManagerMessages.JobResultFailure ||
-					message instanceof JobManagerMessages.JobResultSuccess) {
-
-				jobResultLatch.countDown();
-			}
-		}
-
-		public Queue<Object> getMessages() {
-			return messages;
-		}
-
-		public void awaitJobResult(long timeout) throws InterruptedException {
-			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
-		}
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a simple blocking JobGraph.
-	 */
-	private static JobGraph createBlockingJobGraph() {
-		JobGraph jobGraph = new JobGraph("Blocking program");
-
-		JobVertex jobVertex = new JobVertex("Blocking Vertex");
-		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-		jobGraph.addVertex(jobVertex);
-
-		return jobGraph;
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
-	 */
-	private void verifyCleanRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (!stateHandles.isEmpty()) {
-			fail("File state backend is not clean: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-					"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() != 0) {
-			// Is everything clean again?
-			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
-					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-	/**
-	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
-	 */
-	private void verifyRecoveryState(Configuration config) throws Exception {
-		// File state backend empty
-		Collection<File> stateHandles = FileUtils.listFiles(
-				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
-
-		if (stateHandles.isEmpty()) {
-			fail("File state backend has been cleaned: " + stateHandles);
-		}
-
-		// ZooKeeper
-		String currentJobsPath = config.getString(
-			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
-
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
-
-		if (stat.getCversion() == 0) {
-			// Sanity check: verify that some changes have been performed
-			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
-				"this test. What are you testing?");
-		}
-
-		if (stat.getNumChildren() == 0) {
-			// Children have been cleaned up?
-			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
-				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index bf87515..169f95b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -113,7 +113,6 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	public synchronized boolean contains(JobID jobId) {
-		verifyIsStarted();
 		return storedJobs.containsKey(jobId);
 	}