You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/12/06 15:11:50 UTC

[3/3] flink git commit: [FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.

[FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3fd548e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3fd548e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3fd548e

Branch: refs/heads/master
Commit: a3fd548e9c76c67609bbf159d5fb743d756450b1
Parents: 74d052b
Author: kkloudas <kk...@gmail.com>
Authored: Wed Dec 6 14:32:46 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:28 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 1073 ++++++++----------
 .../HAAbstractQueryableStateTestBase.java       |   22 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |    2 -
 .../NonHAAbstractQueryableStateTestBase.java    |   11 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |    2 -
 5 files changed, 476 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 65e9bb5..5a28367 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -92,7 +93,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
@@ -159,52 +159,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testQueryableState() throws Exception {
-		// Config
+
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		JobID jobId = null;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-		try {
-			//
-			// Test program
-			//
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestKeyRangeSource(numKeys));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
-					"any-name",
-					new SumReduce(),
-					source.getType());
-
-			final String queryName = "hakuna-matata";
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7143749578983540352L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState(queryName, reducingState);
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			cluster.submitJobDetached(jobGraph);
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any-name", new SumReduce(), 	source.getType());
+
+		final String queryName = "hakuna-matata";
+
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7143749578983540352L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState(queryName, reducingState);
 
-			//
-			// Start querying
-			//
-			jobId = jobGraph.getJobID();
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
 
 			final AtomicLongArray counts = new AtomicLongArray(numKeys);
 
@@ -261,16 +249,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				long count = counts.get(i);
 				assertTrue("Count at position " + i + " is " + count, count > 0);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -282,91 +260,94 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		JobID jobId = null;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-		try {
-			//
-			// Test program
-			//
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestKeyRangeSource(numKeys));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
-					"any-name",
-					new SumReduce(),
-					source.getType());
-
-			final String queryName = "duplicate-me";
-
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -4126824763829132959L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName, reducingState);
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
 
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -6265024000462809436L;
+		// Reducing state
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any-name",
+				new SumReduce(),
+				source.getType());
 
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName);
+		final String queryName = "duplicate-me";
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = -4126824763829132959L;
 
-			CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState(queryName, reducingState);
 
-			cluster.submitJobDetached(jobGraph);
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = -6265024000462809436L;
 
-			TestingJobManagerMessages.JobStatusIs jobStatus =
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState(queryName);
+
+		// Submit the job graph
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+		final JobID jobId = jobGraph.getJobID();
+
+		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
+				notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
+
+		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture =
+				notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+
+		cluster.submitJobDetached(jobGraph);
+
+		try {
+			final TestingJobManagerMessages.JobStatusIs jobStatus =
 					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
 			assertEquals(JobStatus.FAILED, jobStatus.state());
+		} catch (Exception e) {
+
+			// if the assertion fails, it means that the job was (falsely) not cancelled.
+			// in this case, and given that the mini-cluster is shared with other tests,
+			// we cancel the job and wait for the cancellation so that the resources are freed.
 
-			// Get the job and check the cause
-			JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
-					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
-
-			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
-			int causedByIndex = failureCause.indexOf("Caused by: ");
-			String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
-			assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
-			assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
-		} finally {
-			// Free cluster resources
 			if (jobId != null) {
-				scala.concurrent.Future<CancellationSuccess> cancellation = cluster
-						.getLeaderGateway(deadline.timeLeft())
+				cluster.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
+
+			// and we re-throw the exception.
+			throw e;
 		}
+
+		// Get the job and check the cause
+		JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+		String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+		assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
+		assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+		int causedByIndex = failureCause.indexOf("Caused by: ");
+		String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+		assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+		assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
 	}
 
 	/**
@@ -377,55 +358,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testValueState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
-					"any",
-					source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7662520075515707428L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		// Value state
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
 
-			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7662520075515707428L;
 
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
 			}
+		}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		}
 	}
 
@@ -434,48 +400,36 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 * contains a wrong jobId or wrong queryable state name.
 	 */
 	@Test
+	@Ignore
 	public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
-					new ValueStateDescriptor<>("any", source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7662520075515707428L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
 
-			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7662520075515707428L;
 
-			cluster.submitJobDetached(jobGraph);
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) {
+
+			// register to be notified when the job is running.
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture =
+					notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+
+			cluster.submitJobDetached(closableJobGraph.getJobGraph());
 
 			// expect for the job to be running
 			TestingJobManagerMessages.JobStatusIs jobStatus =
@@ -486,49 +440,38 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
 					wrongJobId, 						// this is the wrong job id
-					"hankuna",
+					"hakuna",
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
 			try {
-				unknownJobFuture.get();
-				fail(); // by now the job must have failed.
+				unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail(); // by now the request must have failed.
 			} catch (ExecutionException e) {
-				Assert.assertTrue(e.getCause() instanceof RuntimeException);
-				Assert.assertTrue(e.getCause().getMessage().contains(
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
 						"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
-			} catch (Exception ignored) {
-				fail("Unexpected type of exception.");
+			} catch (Exception f) {
+				fail("Unexpected type of exception: " + f.getMessage());
 			}
 
 			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
-					jobId,
-					"wrong-hankuna", // this is the wrong name.
+					closableJobGraph.getJobId(),
+					"wrong-hakuna", // this is the wrong name.
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
 			try {
-				unknownQSName.get();
-				fail(); // by now the job must have failed.
+				unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail(); // by now the request must have failed.
 			} catch (ExecutionException e) {
-				Assert.assertTrue(e.getCause() instanceof RuntimeException);
-				Assert.assertTrue(e.getCause().getMessage().contains(
-						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
-			} catch (Exception ignored) {
-				fail("Unexpected type of exception.");
-			}
-
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
+						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
+			} catch (Exception f) {
+				fail("Unexpected type of exception: " + f.getMessage());
 			}
 		}
 	}
@@ -539,50 +482,44 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testQueryNonStartedJobState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-				.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
-				"any",
-				source.getType(),
-				null);
-
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+			"any", source.getType(), 	null);
+
+		QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+
 					private static final long serialVersionUID = 7480503339992214681L;
 
 					@Override
-					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					public Integer getKey(Tuple2<Integer, Long> value) {
 						return value.f0;
 					}
 				}).asQueryableState("hakuna", valueState);
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			// Now query
 			long expected = numElements;
 
 			// query once
 			client.getKvState(
-					jobId,
+					autoCancellableJob.getJobId(),
 					queryableState.getQueryableStateName(),
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
@@ -591,16 +528,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			cluster.submitJobDetached(jobGraph);
 
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -615,51 +542,37 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	@Test(expected = UnknownKeyOrNamespaceException.class)
 	public void testValueStateDefault() throws Throwable {
 
-		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
-
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies
-				.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-				.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
-				new ValueStateDescriptor<>(
-					"any",
-					source.getType(),
-					Tuple2.of(0, 1337L));
-
-			// only expose key "1"
-			QueryableStateStream<Integer, Tuple2<Integer, Long>>
-				queryableState =
-				source.keyBy(
-					new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 4509274556892655887L;
-
-						@Override
-						public Integer getKey(
-							Tuple2<Integer, Long> value) throws
-							Exception {
-							return 1;
-						}
-					}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any", source.getType(), 	Tuple2.of(0, 1337L));
+
+		// only expose key "1"
+		QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = source.keyBy(
+				new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 4509274556892655887L;
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return 1;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
@@ -683,17 +596,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				// exception in an ExecutionException.
 				throw e.getCause();
 			}
-		} finally {
-
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -707,55 +609,41 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testValueStateShortcut() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state shortcut
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 9168901838808830068L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("matata");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		// Value state shortcut
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 9168901838808830068L;
 
-			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
-					(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
-			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
-		} finally {
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState("matata");
 
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
-						cluster.getLeaderGateway(deadline.timeLeft())
-								.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-								.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+		final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+				(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
 
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
 		}
 	}
 
@@ -768,50 +656,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testFoldingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numElements = 1024;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Folding state
-			FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
-					new FoldingStateDescriptor<>(
-							"any",
-							"0",
-							new SumFold(),
-							StringSerializer.INSTANCE);
-
-			QueryableStateStream<Integer, String> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -842809958106747539L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("pumba", foldingState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = new FoldingStateDescriptor<>(
+				"any", "0", new SumFold(), StringSerializer.INSTANCE);
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = -842809958106747539L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("pumba", foldingState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			String expected = Integer.toString(numElements * (numElements + 1) / 2);
+			final String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -840,16 +718,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -861,48 +729,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testReducingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
-					new ReducingStateDescriptor<>(
-							"any",
-							new SumReduce(),
-							source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("jungle", reducingState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any", new SumReduce(), source.getType());
+
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("jungle", reducingState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements * (numElements + 1L) / 2L;
+			final long expected = numElements * (numElements + 1L) / 2L;
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -931,16 +791,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -952,66 +802,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testMapState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
-					"timon",
-					BasicTypeInfo.INT_TYPE_INFO,
-					source.getType());
-			mapStateDescriptor.setQueryable("timon-queryable");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
-				private static final long serialVersionUID = -805125545438296619L;
+		final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+				"timon", BasicTypeInfo.INT_TYPE_INFO, source.getType());
+		mapStateDescriptor.setQueryable("timon-queryable");
 
-				private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
 
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					mapState = getRuntimeContext().getMapState(mapStateDescriptor);
-				}
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+			private static final long serialVersionUID = -805125545438296619L;
 
-				@Override
-				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
-					Tuple2<Integer, Long> v = mapState.get(value.f0);
-					if (v == null) {
-						v = new Tuple2<>(value.f0, 0L);
-					}
-					mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+			private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+			}
+
+			@Override
+			public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+				Tuple2<Integer, Long> v = mapState.get(value.f0);
+				if (v == null) {
+					v = new Tuple2<>(value.f0, 0L);
 				}
-			});
+				mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+			}
+		});
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements * (numElements + 1L) / 2L;
+			final long expected = numElements * (numElements + 1L) / 2L;
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1039,16 +883,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -1061,62 +895,56 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testListState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
-					"list",
-					BasicTypeInfo.LONG_TYPE_INFO);
-			listStateDescriptor.setQueryable("list-queryable");
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
-				private static final long serialVersionUID = -805125545438296619L;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-				private transient ListState<Long> listState;
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					listState = getRuntimeContext().getListState(listStateDescriptor);
-				}
+		final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+				"list", BasicTypeInfo.LONG_TYPE_INFO);
+		listStateDescriptor.setQueryable("list-queryable");
 
-				@Override
-				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
-					listState.add(value.f1);
-				}
-			});
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+			private static final long serialVersionUID = -805125545438296619L;
 
-			cluster.submitJobDetached(jobGraph);
+			private transient ListState<Long> listState;
 
-			// Now query
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				listState = getRuntimeContext().getListState(listStateDescriptor);
+			}
 
-			Map<Integer, Set<Long>> results = new HashMap<>();
+			@Override
+			public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+				listState.add(value.f1);
+			}
+		});
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+
+			final Map<Integer, Set<Long>> results = new HashMap<>();
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1159,66 +987,48 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				}
 			}
 
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
 	@Test
 	public void testAggregatingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
-					new AggregatingStateDescriptor<>(
-							"aggregates",
-							new SumAggr(),
-							String.class);
-			aggrStateDescriptor.setQueryable("aggr-queryable");
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).transform(
-					"TestAggregatingOperator",
-					BasicTypeInfo.STRING_TYPE_INFO,
-					new AggregatingTestOperator(aggrStateDescriptor)
-			);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
+				new AggregatingStateDescriptor<>("aggregates", new SumAggr(), String.class);
+		aggrStateDescriptor.setQueryable("aggr-queryable");
 
-			// Now query
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).transform(
+				"TestAggregatingOperator",
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new AggregatingTestOperator(aggrStateDescriptor)
+		);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1246,16 +1056,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -1316,7 +1116,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				notifyAll();
 			}
 		}
-
 	}
 
 	/**
@@ -1465,6 +1264,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/////				General Utility Methods				//////
 
+	/**
+	 * A wrapper of the job graph that makes sure to cancel the job and wait for
+	 * termination after the execution of every test.
+	 */
+	private static class AutoCancellableJob implements AutoCloseable {
+
+		private final FlinkMiniCluster cluster;
+		private final Deadline deadline;
+		private final JobGraph jobGraph;
+
+		private final JobID jobId;
+		private final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture;
+
+		AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) {
+			Preconditions.checkNotNull(env);
+
+			this.cluster = Preconditions.checkNotNull(cluster);
+			this.jobGraph = env.getStreamGraph().getJobGraph();
+			this.deadline = Preconditions.checkNotNull(deadline);
+
+			this.jobId = jobGraph.getJobID();
+			this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+		}
+
+		JobGraph getJobGraph() {
+			return jobGraph;
+		}
+
+		JobID getJobId() {
+			return jobId;
+		}
+
+		@Override
+		public void close() throws Exception {
+			// Free cluster resources
+			if (jobId != null) {
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+				cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs(
+			final JobID jobId, final JobStatus status, final Deadline deadline) {
+
+		return FutureUtils.toJava(
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+	}
+
 	private static <K, S extends State, V> CompletableFuture<S> getKvState(
 			final Deadline deadline,
 			final QueryableStateClient client,

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index b9ce7c2..8767b52 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+
 import static org.junit.Assert.fail;
 
 /**
@@ -79,19 +81,13 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 	}
 
 	@AfterClass
-	public static void tearDown() {
-		if (cluster != null) {
-			cluster.stop();
-			cluster.awaitTermination();
-		}
+	public static void tearDown() throws IOException {
+		client.shutdownAndWait();
 
-		try {
-			zkServer.stop();
-			zkServer.close();
-			client.shutdownAndWait();
-		} catch (Throwable e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		cluster.stop();
+		cluster.awaitTermination();
+
+		zkServer.stop();
+		zkServer.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
 
 	@Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index a5e24b2..2686a29 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -67,12 +67,9 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
 
 	@AfterClass
 	public static void tearDown() {
-		try {
-			cluster.stop();
-			client.shutdownAndWait();
-		} catch (Throwable e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		client.shutdownAndWait();
+
+		cluster.stop();
+		cluster.awaitTermination();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
 
 	@Rule