You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:00:17 UTC

[1/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive

Repository: flink
Updated Branches:
  refs/heads/master 1804aa33d -> ab1fbfdfe


http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index bcc62cb..fde16fc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
@@ -37,8 +38,8 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
 
 	private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
 
-	public TaskManagerMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 865385f..69ee762 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class ClusterOverviewHandlerTest {
 	@Test
 	public void testGetPaths() {
-		ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L));
+		ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/overview", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index ea26f5d..6061e4b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class CurrentJobIdsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L));
+		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 64360d3..ccfafd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -66,17 +67,17 @@ public class CurrentJobsOverviewHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
+		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
 		String[] pathsAll = handlerAll.getPaths();
 		Assert.assertEquals(1, pathsAll.length);
 		Assert.assertEquals("/joboverview", pathsAll[0]);
 
-		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
+		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
 		String[] pathsRunning = handlerRunning.getPaths();
 		Assert.assertEquals(1, pathsRunning.length);
 		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
 
-		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
+		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
 		String[] pathsCompleted = handlerCompleted.getPaths();
 		Assert.assertEquals(1, pathsCompleted.length);
 		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
index d17b55f..22b3e5e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 
@@ -33,7 +34,7 @@ import java.util.TimeZone;
 public class DashboardConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
-		DashboardConfigHandler handler = new DashboardConfigHandler(10000L);
+		DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
index a498cf2..97091cf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.junit.Assert;
@@ -31,7 +33,7 @@ import java.util.List;
 public class JarAccessDeniedHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarAccessDeniedHandler handler = new JarAccessDeniedHandler();
+		JarAccessDeniedHandler handler = new JarAccessDeniedHandler(Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(5, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
index bcbb1ea..a067d8f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarDeleteHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarDeleteHandler handler = new JarDeleteHandler(null);
+		JarDeleteHandler handler = new JarDeleteHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
index 863c248..5da4913 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarListHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarListHandler handler = new JarListHandler(null);
+		JarListHandler handler = new JarListHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
index a3ded83..f5ed339 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarPlanHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarPlanHandler handler = new JarPlanHandler(null);
+		JarPlanHandler handler = new JarPlanHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 82aa87a..67dad13 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,7 +31,7 @@ import org.junit.Test;
 public class JarRunHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration());
+		JarRunHandler handler = new JarRunHandler(Executors.directExecutor(), null, Time.seconds(0L), new Configuration());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid/run", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index e57ca34..ea8b524 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarUploadHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarUploadHandler handler = new JarUploadHandler(null);
+		JarUploadHandler handler = new JarUploadHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/upload", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index fe55f51..5510fed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -54,7 +55,7 @@ public class JobAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index a436b2d..86c5295 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -33,7 +34,7 @@ import java.util.List;
 public class JobCancellationHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT());
+		JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index b48ee66..529d130 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -92,7 +92,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
@@ -121,7 +121,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
@@ -152,7 +152,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		handler = handlers.getTriggerHandler();
 
 		try {
-			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 			fail("Did not throw expected test Exception");
 		} catch (Exception e) {
 			IllegalStateException cause = (IllegalStateException) e.getCause();
@@ -169,7 +169,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -187,7 +187,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
 
 		// Trigger
-		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
@@ -206,7 +206,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		assertEquals(location, root.get("location").asText());
 
 		// Trigger again
-		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -225,7 +225,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query progress
 		params.put("requestId", "1");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -239,7 +239,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Complete
 		successfulCancelWithSavepoint.complete("_path-savepoint_");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -255,7 +255,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 
 		// Query again, keep recent history
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -272,7 +272,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query for unknown request
 		params.put("requestId", "9929");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -295,7 +295,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -319,7 +319,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query progress
 		params.put("requestId", "1");
 
-		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index 104b0a3..1c08ae8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -55,7 +56,7 @@ public class JobConfigHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class));
+		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index bfeb40a..ee0498e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -68,7 +69,7 @@ public class JobDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index f54ab06..6e0f918 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class JobExceptionsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class));
+		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
index 8e16e8a..94fd5a8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JobManagerConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobManagerConfigHandler handler = new JobManagerConfigHandler(null);
+		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobmanager/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 17b4c44..4a934ec 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -51,7 +52,7 @@ public class JobPlanHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class));
+		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index 89bf426..8c05c83 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -34,7 +35,7 @@ import java.util.List;
 public class JobStoppingHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT());
+		JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index b7af323..5af9aa6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -57,7 +58,7 @@ public class JobVertexAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index d2ac0d6..0d15e08 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -46,7 +47,7 @@ import static org.mockito.Mockito.when;
 public class JobVertexBackPressureHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0);
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
@@ -63,10 +64,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				9999);
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);
@@ -96,10 +98,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				9999);
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);
@@ -157,10 +160,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				0); // <----- refresh interval should fire immediately
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bc4fe9c..1b8d9aa 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -59,7 +60,7 @@ public class JobVertexDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index d5d877a..badb952 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -61,7 +62,7 @@ public class JobVertexTaskManagersHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
+		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index d992b85..a80bac9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import org.junit.Assert;
@@ -31,7 +32,7 @@ import static org.mockito.Mockito.mock;
 public class SubtaskCurrentAttemptDetailsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index ce8e72f..6773fd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -64,7 +65,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index e1fbf92..7777d2d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -73,7 +74,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(),  null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index f33da80..7b400da 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 548efaf..31c2212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -59,7 +60,7 @@ public class SubtasksTimesHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
+		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index afc2764..e3a71a1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -33,7 +34,7 @@ import java.util.List;
 public class TaskManagersHandlerTest {
 	@Test
 	public void testGetPaths() {
-		TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null);
+		TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index ce943b1..47298be 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -79,7 +80,7 @@ public class CheckpointConfigHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]);
@@ -95,8 +96,8 @@ public class CheckpointConfigHandlerTest {
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);
@@ -121,8 +122,8 @@ public class CheckpointConfigHandlerTest {
 
 		AccessExecutionGraph graph = graphAndSettings.graph;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);
@@ -140,8 +141,8 @@ public class CheckpointConfigHandlerTest {
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode externalizedNode = mapper.readTree(json).get("externalization");

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 0259aa5..f16d623 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -101,7 +102,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
@@ -113,10 +114,10 @@ public class CheckpointStatsDetailsHandlerTest {
 	@Test
 	public void testIllegalCheckpointId() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "illegal checkpoint");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -127,8 +128,8 @@ public class CheckpointStatsDetailsHandlerTest {
 	@Test
 	public void testNoCheckpointIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		assertEquals("{}", json);
 	}
@@ -147,10 +148,10 @@ public class CheckpointStatsDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(history, times(1)).getCheckpointById(anyLong());
@@ -318,10 +319,10 @@ public class CheckpointStatsDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		return mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 9425a4c..ed73a62 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -86,7 +87,7 @@ public class CheckpointStatsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]);
@@ -99,8 +100,8 @@ public class CheckpointStatsHandlerTest {
 	public void testCheckpointStatsRequest() throws Exception {
 		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
 
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap());
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index b8eb715..9c5e168 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -100,7 +101,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
@@ -149,10 +150,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testIllegalCheckpointId() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "illegal checkpoint");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -163,8 +164,8 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testNoCheckpointIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		assertEquals("{}", json);
 	}
@@ -183,11 +184,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(history, times(1)).getCheckpointById(anyLong());
@@ -199,11 +200,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testIllegalJobVertexIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "1");
 		params.put("vertexid", "illegal vertex id");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -214,10 +215,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testNoJobVertexIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "1");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -238,11 +239,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
@@ -259,11 +260,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		return mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 90e032d..5296d33 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -49,7 +49,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -58,7 +58,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("vertexid", "taskid");
 
 		// get list of available metrics
-		String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
+		String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -69,7 +69,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get value for a single metric
 		queryParams.put("get", "8.opname.abc.metric5");
 
-		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
+		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -80,7 +80,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get values for multiple metrics
 		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
 
-		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
+		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
@@ -102,7 +102,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -114,7 +114,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("jobid", "nonexistent");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail();
 		}
@@ -132,7 +132,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -144,7 +144,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -154,7 +154,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.metric5");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -164,7 +164,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.nonexistant");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 994fc5e..b02949a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -40,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(mock(MetricFetcher.class));
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobmanager/metrics", paths[0]);
@@ -55,7 +55,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 
@@ -73,7 +73,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index a35af22..569f772 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobMetricsHandler handler = new JobMetricsHandler(mock(MetricFetcher.class));
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -75,7 +75,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index e84b11d..e6bbd2e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -42,7 +42,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobVertexMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(mock(MetricFetcher.class));
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
@@ -57,7 +57,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -79,7 +79,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index c20ea98..c4c1c7a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class TaskManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(mock(MetricFetcher.class));
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
@@ -75,7 +75,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c551a7..f0073db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2232,7 +2232,7 @@ object JobManager {
           new AkkaJobManagerRetriever(jobManagerSystem, timeout),
           new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
           timeout,
-          jobManagerSystem.dispatcher)
+          futureExecutor)
 
         Option(webServer)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e622130..41e5629 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -1094,7 +1094,7 @@ public class TaskExecutorTest extends TestLogger {
 
 				fail("The slot request should have failed.");
 			} catch (Exception e) {
-				assertTrue(ExceptionUtils.containsThrowable(e, SlotAllocationException.class));
+				assertTrue(ExceptionUtils.findThrowable(e, SlotAllocationException.class).isPresent());
 			}
 
 			// re-register


[3/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive

Posted by tr...@apache.org.
[FLINK-7409] [web] Make WebRuntimeMonitor reactive

This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.

This closes #4527.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab1fbfdf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab1fbfdf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab1fbfdf

Branch: refs/heads/master
Commit: ab1fbfdfe6c1f2b6885710f98b9480cb90396ac0
Parents: 1804aa3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 10 10:56:12 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Sep 3 23:43:58 2017 +0200

----------------------------------------------------------------------
 .../RetryRejectedExecutionFailureHandler.java   |   2 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  13 +-
 .../webmonitor/ExecutionGraphHolder.java        |  16 +-
 .../webmonitor/RuntimeMonitorHandler.java       |  60 +++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  90 +++----
 .../AbstractExecutionGraphRequestHandler.java   |  35 ++-
 .../AbstractJobVertexRequestHandler.java        |  10 +-
 .../handlers/AbstractJsonRequestHandler.java    |  33 ++-
 .../AbstractSubtaskAttemptRequestHandler.java   |  22 +-
 .../handlers/AbstractSubtaskRequestHandler.java |  18 +-
 .../handlers/ClusterOverviewHandler.java        |  61 +++--
 .../handlers/CurrentJobIdsHandler.java          | 101 ++++----
 .../handlers/CurrentJobsOverviewHandler.java    |  89 ++++---
 .../handlers/DashboardConfigHandler.java        |   9 +-
 .../handlers/JarAccessDeniedHandler.java        |  10 +-
 .../webmonitor/handlers/JarActionHandler.java   |   4 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |  62 +++--
 .../webmonitor/handlers/JarListHandler.java     | 157 ++++++------
 .../webmonitor/handlers/JarPlanHandler.java     |  43 ++--
 .../webmonitor/handlers/JarRunHandler.java      |  59 +++--
 .../webmonitor/handlers/JarUploadHandler.java   |  51 ++--
 .../handlers/JobAccumulatorsHandler.java        |  19 +-
 .../handlers/JobCancellationHandler.java        |  38 +--
 .../JobCancellationWithSavepointHandlers.java   | 158 ++++++------
 .../webmonitor/handlers/JobConfigHandler.java   |  20 +-
 .../webmonitor/handlers/JobDetailsHandler.java  |  19 +-
 .../handlers/JobExceptionsHandler.java          |  20 +-
 .../handlers/JobManagerConfigHandler.java       |  60 +++--
 .../webmonitor/handlers/JobPlanHandler.java     |  10 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  38 +--
 .../handlers/JobVertexAccumulatorsHandler.java  |  20 +-
 .../handlers/JobVertexBackPressureHandler.java  |  17 +-
 .../handlers/JobVertexDetailsHandler.java       |  19 +-
 .../handlers/JobVertexTaskManagersHandler.java  |  19 +-
 .../webmonitor/handlers/RequestHandler.java     |   9 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |   8 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |  19 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |  19 +-
 .../SubtasksAllAccumulatorsHandler.java         |  19 +-
 .../handlers/SubtasksTimesHandler.java          |  19 +-
 .../handlers/TaskManagersHandler.java           | 256 ++++++++++---------
 .../checkpoints/CheckpointConfigHandler.java    |  19 +-
 .../CheckpointStatsDetailsHandler.java          |  63 +++--
 .../CheckpointStatsDetailsSubtasksHandler.java  |  33 ++-
 .../checkpoints/CheckpointStatsHandler.java     |  19 +-
 .../metrics/AbstractMetricsHandler.java         |  27 +-
 .../metrics/JobManagerMetricsHandler.java       |   5 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |   5 +-
 .../metrics/JobVertexMetricsHandler.java        |   5 +-
 .../metrics/TaskManagerMetricsHandler.java      |   5 +-
 .../handlers/ClusterOverviewHandlerTest.java    |   3 +-
 .../handlers/CurrentJobIdsHandlerTest.java      |   3 +-
 .../CurrentJobsOverviewHandlerTest.java         |   7 +-
 .../handlers/DashboardConfigHandlerTest.java    |   3 +-
 .../handlers/JarAccessDeniedHandlerTest.java    |   4 +-
 .../handlers/JarDeleteHandlerTest.java          |   4 +-
 .../webmonitor/handlers/JarListHandlerTest.java |   4 +-
 .../webmonitor/handlers/JarPlanHandlerTest.java |   4 +-
 .../webmonitor/handlers/JarRunHandlerTest.java  |   3 +-
 .../handlers/JarUploadHandlerTest.java          |   4 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |   3 +-
 .../handlers/JobCancellationHandlerTest.java    |   3 +-
 ...obCancellationWithSavepointHandlersTest.java |  24 +-
 .../handlers/JobConfigHandlerTest.java          |   3 +-
 .../handlers/JobDetailsHandlerTest.java         |   3 +-
 .../handlers/JobExceptionsHandlerTest.java      |   3 +-
 .../handlers/JobManagerConfigHandlerTest.java   |   4 +-
 .../webmonitor/handlers/JobPlanHandlerTest.java |   3 +-
 .../handlers/JobStoppingHandlerTest.java        |   3 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   3 +-
 .../JobVertexBackPressureHandlerTest.java       |  12 +-
 .../handlers/JobVertexDetailsHandlerTest.java   |   3 +-
 .../JobVertexTaskManagersHandlerTest.java       |   3 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   3 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   3 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   3 +-
 .../handlers/SubtasksTimesHandlerTest.java      |   3 +-
 .../handlers/TaskManagersHandlerTest.java       |   3 +-
 .../CheckpointConfigHandlerTest.java            |  15 +-
 .../CheckpointStatsDetailsHandlerTest.java      |  19 +-
 .../checkpoints/CheckpointStatsHandlerTest.java |   7 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java |  31 +--
 .../metrics/AbstractMetricsHandlerTest.java     |  20 +-
 .../metrics/JobManagerMetricsHandlerTest.java   |   6 +-
 .../metrics/JobMetricsHandlerTest.java          |   6 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |   6 +-
 .../metrics/TaskManagerMetricsHandlerTest.java  |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   2 +-
 90 files changed, 1278 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index 9380959..3706257 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -36,7 +36,7 @@ public class RetryRejectedExecutionFailureHandler implements ActionRequestFailur
 
 	@Override
 	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
-		if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+		if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
 			indexer.add(action);
 		} else {
 			// rethrow all other failures

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 9c8907b..d141ecb 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -276,27 +277,27 @@ public final class ExceptionUtils {
 	}
 
 	/**
-	 * Checks whether a throwable chain contains a specific type of exception.
+	 * Checks whether a throwable chain contains a specific type of exception and returns it.
 	 *
 	 * @param throwable the throwable chain to check.
 	 * @param searchType the type of exception to search for in the chain.
-	 * @return True, if the searched type is nested in the throwable, false otherwise.
+	 * @return Optional throwable of the requested type if available, otherwise empty
 	 */
-	public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
+	public static Optional<Throwable> findThrowable(Throwable throwable, Class<?> searchType) {
 		if (throwable == null || searchType == null) {
-			return false;
+			return Optional.empty();
 		}
 
 		Throwable t = throwable;
 		while (t != null) {
 			if (searchType.isAssignableFrom(t.getClass())) {
-				return true;
+				return Optional.of(t);
 			} else {
 				t = t.getCause();
 			}
 		}
 
-		return false;
+		return Optional.empty();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 739b375..8a96969 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 import java.util.WeakHashMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,26 +64,23 @@ public class ExecutionGraphHolder {
 	 *
 	 * @param jid jobID of the execution graph to be retrieved
 	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
-	 * @throws Exception if the ExecutionGraph retrieval failed.
 	 */
-	public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			if (cached.getState() == JobStatus.SUSPENDED) {
 				cache.remove(jid);
 			} else {
-				return Optional.of(cached);
+				return CompletableFuture.completedFuture(Optional.of(cached));
 			}
 		}
 
 		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
 
-		Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		executionGraphFuture.thenAcceptAsync(
+			optExecutionGraph ->
+				optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
 
-		return result.map((executionGraph) -> {
-			cache.put(jid, executionGraph);
-
-			return executionGraph;
-		});
+		return executionGraphFuture;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 35d13dd..6305537 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
@@ -45,6 +46,7 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +90,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	@Override
 	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
-		FullHttpResponse response;
+		CompletableFuture<FullHttpResponse> responseFuture;
 
 		try {
 			// we only pass the first element in the list to the handlers.
@@ -106,29 +108,41 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+			responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+		} catch (Exception e) {
+			responseFuture = FutureUtils.completedExceptionally(e);
 		}
-		catch (NotFoundException e) {
-			// this should result in a 404 error code (not found)
-			ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
-					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
-			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-			LOG.debug("Error while handling request", e);
-		}
-		catch (Exception e) {
-			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
-			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			LOG.debug("Error while handling request", e);
-		}
-
-		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
 
-		KeepAliveWrite.flush(ctx, routed.request(), response);
+		responseFuture.whenComplete(
+			(FullHttpResponse httpResponse, Throwable throwable) -> {
+				final FullHttpResponse finalResponse;
+
+				if (throwable != null) {
+					LOG.debug("Error while handling request.", throwable);
+
+					Optional<Throwable> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);
+
+					if (optNotFound.isPresent()) {
+						// this should result in a 404 error code (not found)
+						Throwable e = optNotFound.get();
+						ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
+							: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
+						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+					} else {
+						byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+							HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+					}
+				} else {
+					finalResponse = httpResponse;
+				}
+
+				finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
+				KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
+			});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 17f02f0..e74541e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -232,41 +232,41 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		Router router = new Router();
 		// config how to interact with this web server
-		get(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
+		get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));
 
 		// the overview - how many task managers, slots, free slots, ...
-		get(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+		get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));
 
 		// job manager configuration
-		get(router, new JobManagerConfigHandler(config));
+		get(router, new JobManagerConfigHandler(executor, config));
 
 		// overview over jobs
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
-
-		get(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
-
-		get(router, new JobDetailsHandler(currentGraphs, metricFetcher));
-
-		get(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtasksTimesHandler(currentGraphs));
-		get(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
-		get(router, new JobVertexAccumulatorsHandler(currentGraphs));
-		get(router, new JobVertexBackPressureHandler(currentGraphs,	backPressureStatsTracker, refreshInterval));
-		get(router, new JobVertexMetricsHandler(metricFetcher));
-		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
-		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
-
-		get(router, new JobPlanHandler(currentGraphs));
-		get(router, new JobConfigHandler(currentGraphs));
-		get(router, new JobExceptionsHandler(currentGraphs));
-		get(router, new JobAccumulatorsHandler(currentGraphs));
-		get(router, new JobMetricsHandler(metricFetcher));
-
-		get(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));
+
+		get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));
+
+		get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));
+
+		get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtasksTimesHandler(currentGraphs, executor));
+		get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
+		get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
+		get(router, new JobVertexBackPressureHandler(currentGraphs, executor,	backPressureStatsTracker, refreshInterval));
+		get(router, new JobVertexMetricsHandler(executor, metricFetcher));
+		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
+		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));
+
+		get(router, new JobPlanHandler(currentGraphs, executor));
+		get(router, new JobConfigHandler(currentGraphs, executor));
+		get(router, new JobExceptionsHandler(currentGraphs, executor));
+		get(router, new JobAccumulatorsHandler(currentGraphs, executor));
+		get(router, new JobMetricsHandler(executor, metricFetcher));
+
+		get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
@@ -287,7 +287,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 				config,
 				enableSSL,
 				blobView));
-		get(router, new TaskManagerMetricsHandler(metricFetcher));
+		get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
 
 		router
 			// log and stdout
@@ -299,48 +299,48 @@ public class WebRuntimeMonitor implements WebMonitor {
 				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
 					enableSSL));
 
-		get(router, new JobManagerMetricsHandler(metricFetcher));
+		get(router, new JobManagerMetricsHandler(executor, metricFetcher));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobCancellationHandler(timeout));
+		get(router, new JobCancellationHandler(executor, timeout));
 		// DELETE is the preferred way of canceling a job (Rest-conform)
-		delete(router, new JobCancellationHandler(timeout));
+		delete(router, new JobCancellationHandler(executor, timeout));
 
 		get(router, triggerHandler);
 		get(router, inProgressHandler);
 
 		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobStoppingHandler(timeout));
+		get(router, new JobStoppingHandler(executor, timeout));
 		// DELETE is the preferred way of stopping a job (Rest-conform)
-		delete(router, new JobStoppingHandler(timeout));
+		delete(router, new JobStoppingHandler(executor, timeout));
 
 		int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
 
 		// Register the checkpoint stats handlers
-		get(router, new CheckpointStatsHandler(currentGraphs));
-		get(router, new CheckpointConfigHandler(currentGraphs));
-		get(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
-		get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
+		get(router, new CheckpointStatsHandler(currentGraphs, executor));
+		get(router, new CheckpointConfigHandler(currentGraphs, executor));
+		get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
+		get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));
 
 		if (webSubmitAllow) {
 			// fetch the list of uploaded jars.
-			get(router, new JarListHandler(uploadDir));
+			get(router, new JarListHandler(executor, uploadDir));
 
 			// get plan for an uploaded jar
-			get(router, new JarPlanHandler(uploadDir));
+			get(router, new JarPlanHandler(executor, uploadDir));
 
 			// run a jar
-			post(router, new JarRunHandler(uploadDir, timeout, config));
+			post(router, new JarRunHandler(executor, uploadDir, timeout, config));
 
 			// upload a jar
-			post(router, new JarUploadHandler(uploadDir));
+			post(router, new JarUploadHandler(executor, uploadDir));
 
 			// delete an uploaded jar from submission interface
-			delete(router, new JarDeleteHandler(uploadDir));
+			delete(router, new JarDeleteHandler(executor, uploadDir));
 		} else {
 			// send an Access Denied message
-			JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
+			JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
 			get(router, jad);
 			post(router, jad);
 			delete(router, jad);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 89108db..053d3f7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -28,6 +30,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on an ExecutionGraph
@@ -37,12 +41,16 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 
 	private final ExecutionGraphHolder executionGraphHolder;
 
-	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executor);
 		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -53,21 +61,20 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 			jid = JobID.fromHexString(jidString);
 		}
 		catch (Exception e) {
-			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
 		}
 
-		final Optional<AccessExecutionGraph> optGraph;
+		final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
 
-		try {
-			optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
-		} catch (Exception e) {
-			throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
-		}
-
-		final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
-
-		return handleRequest(graph, pathParams);
+		return graphFuture.thenComposeAsync(
+			(Optional<AccessExecutionGraph> optGraph) -> {
+				if (optGraph.isPresent()) {
+					return handleRequest(optGraph.get(), pathParams);
+				} else {
+					throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+				}
+			}, executor);
 	}
 
-	public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index af9fc6c..df09225 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job vertex (defined
@@ -31,12 +33,12 @@ import java.util.Map;
  */
 public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
 
-	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+	public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
 		final JobVertexID vid = parseJobVertexId(params);
 
 		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
@@ -66,5 +68,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 		}
 	}
 
-	public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 266ffb0..1ec3f9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -29,6 +30,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
 import java.nio.charset.Charset;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for most request handlers. The handlers must produce a JSON response.
@@ -37,18 +40,28 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
+	protected final Executor executor;
+
+	protected AbstractJsonRequestHandler(Executor executor) {
+		this.executor = Preconditions.checkNotNull(executor);
+	}
+
 	@Override
-	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-		byte[] bytes = result.getBytes(ENCODING);
+	public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+
+		return resultFuture.thenApplyAsync(
+			(String result) -> {
+				byte[] bytes = result.getBytes(ENCODING);
 
-		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-				HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+				DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
 
-		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+				response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+				response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
 
-		return response;
+				return response;
+			});
 	}
 
 	/**
@@ -66,9 +79,9 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	 *         response with the exception message, other exceptions will cause a HTTP 500 response
 	 *         with the exception stack trace.
 	 */
-	public abstract String handleJsonRequest(
+	public abstract CompletableFuture<String> handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) throws Exception;
+			JobManagerGateway jobManagerGateway);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 2792008..1b20673 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific subtask execution attempt
@@ -32,15 +36,15 @@ import java.util.Map;
  */
 public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
 
-	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
 		final String attemptNumberString = params.get("attempt");
 		if (attemptNumberString == null) {
-			throw new RuntimeException("Attempt number parameter missing");
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
 		}
 
 		final int attempt;
@@ -48,7 +52,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 			attempt = Integer.parseInt(attemptNumberString);
 		}
 		catch (NumberFormatException e) {
-			throw new RuntimeException("Invalid attempt number parameter");
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
 		}
 
 		final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
@@ -61,14 +65,14 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 			if (exec != null) {
 				return handleRequest(exec, params);
 			} else {
-				throw new RequestHandlerException("Execution for attempt " + attempt +
-					" has already been deleted.");
+				return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
+					" has already been deleted."));
 			}
 		}
 		else {
-			throw new RuntimeException("Attempt does not exist: " + attempt);
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
 		}
 	}
 
-	public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index b977228..ab85034 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific subtask (defined via the
@@ -31,15 +35,15 @@ import java.util.Map;
  */
 public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
 
-	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
 		final String subtaskNumberString = params.get("subtasknum");
 		if (subtaskNumberString == null) {
-			throw new RuntimeException("Subtask number parameter missing");
+			return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
 		}
 
 		final int subtask;
@@ -47,16 +51,16 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
 			subtask = Integer.parseInt(subtaskNumberString);
 		}
 		catch (NumberFormatException e) {
-			throw new RuntimeException("Invalid subtask number parameter");
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
 		}
 
 		if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
-			throw new RuntimeException("subtask does not exist: " + subtask);
+			return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
 		}
 
 		final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
 		return handleRequest(vertex, params);
 	}
 
-	public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 4ebc4e7..17db2e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,7 +50,8 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public ClusterOverviewHandler(Time timeout) {
+	public ClusterOverviewHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = checkNotNull(timeout);
 	}
 
@@ -56,39 +61,45 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		// we need no parameters, get all requests
 		try {
 			if (jobManagerGateway != null) {
 				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
 
-				StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-				gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-				gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-				gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-				gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-				gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-				gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-				gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-				gen.writeStringField("flink-version", version);
-				if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-					gen.writeStringField("flink-commit", commitID);
-				}
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
+				return overviewFuture.thenApplyAsync(
+					(StatusOverview overview) -> {
+						StringWriter writer = new StringWriter();
+						try {
+							JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+							gen.writeStartObject();
+							gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+							gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+							gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+							gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+							gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+							gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+							gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+							gen.writeStringField("flink-version", version);
+							if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
+								gen.writeStringField("flink-commit", commitID);
+							}
+							gen.writeEndObject();
+
+							gen.close();
+							return writer.toString();
+						} catch (IOException exception) {
+							throw new FlinkFutureException("Could not write cluster overview.", exception);
+						}
+					},
+					executor);
 			} else {
 				throw new Exception("No connection to the leading JobManager.");
 			}
 		}
 		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
+			return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 778a300..acf1cd0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 
@@ -28,6 +29,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
@@ -43,7 +45,8 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public CurrentJobIdsHandler(Time timeout) {
+	public CurrentJobIdsHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = requireNonNull(timeout);
 	}
 
@@ -53,53 +56,57 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		// we need no parameters, get all requests
-		try {
-			if (jobManagerGateway != null) {
-				CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-				JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-
-				gen.writeArrayFieldStart("jobs-running");
-				for (JobID jid : overview.getJobsRunningOrPending()) {
-					gen.writeString(jid.toString());
-				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-finished");
-				for (JobID jid : overview.getJobsFinished()) {
-					gen.writeString(jid.toString());
-				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-cancelled");
-				for (JobID jid : overview.getJobsCancelled()) {
-					gen.writeString(jid.toString());
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				// we need no parameters, get all requests
+				try {
+					if (jobManagerGateway != null) {
+						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+						StringWriter writer = new StringWriter();
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+						gen.writeStartObject();
+
+						gen.writeArrayFieldStart("jobs-running");
+						for (JobID jid : overview.getJobsRunningOrPending()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-finished");
+						for (JobID jid : overview.getJobsFinished()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-cancelled");
+						for (JobID jid : overview.getJobsCancelled()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-failed");
+						for (JobID jid : overview.getJobsFailed()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeEndObject();
+
+						gen.close();
+						return writer.toString();
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
 				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-failed");
-				for (JobID jid : overview.getJobsFailed()) {
-					gen.writeString(jid.toString());
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
 				}
-				gen.writeEndArray();
-
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
-		}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index b324426..a5b116c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -36,7 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,10 +57,12 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	private final boolean includeFinishedJobs;
 
 	public CurrentJobsOverviewHandler(
+			Executor executor,
 			Time timeout,
 			boolean includeRunningJobs,
 			boolean includeFinishedJobs) {
 
+		super(executor);
 		this.timeout = checkNotNull(timeout);
 		this.includeRunningJobs = includeRunningJobs;
 		this.includeFinishedJobs = includeFinishedJobs;
@@ -77,49 +81,50 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			if (jobManagerGateway != null) {
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-				MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				final long now = System.currentTimeMillis();
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-				gen.writeStartObject();
-
-				if (includeRunningJobs && includeFinishedJobs) {
-					gen.writeArrayFieldStart("running");
-					for (JobDetails detail : result.getRunningJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
-					}
-					gen.writeEndArray();
-
-					gen.writeArrayFieldStart("finished");
-					for (JobDetails detail : result.getFinishedJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+
+			return jobDetailsFuture.thenApplyAsync(
+				(MultipleJobsDetails result) -> {
+					final long now = System.currentTimeMillis();
+
+					StringWriter writer = new StringWriter();
+					try {
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+						gen.writeStartObject();
+
+						if (includeRunningJobs && includeFinishedJobs) {
+							gen.writeArrayFieldStart("running");
+							for (JobDetails detail : result.getRunningJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+
+							gen.writeArrayFieldStart("finished");
+							for (JobDetails detail : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						} else {
+							gen.writeArrayFieldStart("jobs");
+							for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						}
+
+						gen.writeEndObject();
+						gen.close();
+						return writer.toString();
+					} catch (IOException e) {
+						throw new FlinkFutureException("Could not write current jobs overview json.", e);
 					}
-					gen.writeEndArray();
-				}
-				else {
-					gen.writeArrayFieldStart("jobs");
-					for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
-					}
-					gen.writeEndArray();
-				}
-
-				gen.writeEndObject();
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
+				},
+				executor);
 		}
-		catch (Exception e) {
-			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index fe1d06b..39984b1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Responder that returns the parameters that define how the asynchronous requests
@@ -39,7 +41,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 
 	private final String configString;
 
-	public DashboardConfigHandler(long refreshInterval) {
+	public DashboardConfigHandler(Executor executor, long refreshInterval) {
+		super(executor);
 		try {
 			this.configString = createConfigJson(refreshInterval);
 		}
@@ -55,8 +58,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		return this.configString;
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.completedFuture(configString);
 	}
 
 	public static String createConfigJson(long refreshInterval) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index db55169..978432b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler to deny access to jar-related REST calls.
@@ -30,6 +32,10 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
 			"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
 
+	public JarAccessDeniedHandler(Executor executor) {
+		super(executor);
+	}
+
 	@Override
 	public String[] getPaths() {
 		return new String[]{
@@ -42,7 +48,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		return ERROR_MESSAGE;
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.completedFuture(ERROR_MESSAGE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index d86a21b..0b0d32e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -46,6 +46,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Abstract handler for fetching plan for a jar or running a jar.
@@ -54,7 +55,8 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarActionHandler(File jarDirectory) {
+	public JarActionHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 73771bd..d9df1d4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -26,6 +27,8 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handles requests for deletion of jars.
@@ -36,7 +39,8 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarDeleteHandler(File jarDirectory) {
+	public JarDeleteHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 
@@ -46,33 +50,37 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		final String file = pathParams.get("jarid");
-		try {
-			File[] list = jarDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.equals(file);
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					File[] list = jarDir.listFiles(new FilenameFilter() {
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.equals(file);
+						}
+					});
+					boolean success = false;
+					for (File f: list) {
+						// although next to impossible for multiple files, we still delete them.
+						success = success || f.delete();
+					}
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					if (!success) {
+						// this seems to always fail on Windows.
+						gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
+					}
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
 				}
-			});
-			boolean success = false;
-			for (File f: list) {
-				// although next to impossible for multiple files, we still delete them.
-				success = success || f.delete();
-			}
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			if (!success) {
-				// this seems to always fail on Windows.
-				gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
-			}
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to delete jar id " + pathParams.get("jarid") + ": " + e.getMessage(), e);
-		}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4f9b188..95281a4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
@@ -29,6 +30,8 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
@@ -41,7 +44,8 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public  JarListHandler(File jarDirectory) {
+	public  JarListHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 
@@ -51,88 +55,93 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-			gen.writeStartObject();
-			gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
-			gen.writeArrayFieldStart("files");
-
-			File[] list = jarDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.endsWith(".jar");
-				}
-			});
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-			for (File f : list) {
-				// separate the uuid and the name parts.
-				String id = f.getName();
+					gen.writeStartObject();
+					gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
+					gen.writeArrayFieldStart("files");
 
-				int startIndex = id.indexOf("_");
-				if (startIndex < 0) {
-					continue;
-				}
-				String name = id.substring(startIndex + 1);
-				if (name.length() < 5 || !name.endsWith(".jar")) {
-					continue;
-				}
+					File[] list = jarDir.listFiles(new FilenameFilter() {
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.endsWith(".jar");
+						}
+					});
 
-				gen.writeStartObject();
-				gen.writeStringField("id", id);
-				gen.writeStringField("name", name);
-				gen.writeNumberField("uploaded", f.lastModified());
-				gen.writeArrayFieldStart("entry");
+					for (File f : list) {
+						// separate the uuid and the name parts.
+						String id = f.getName();
 
-				String[] classes = new String[0];
-				try {
-					JarFile jar = new JarFile(f);
-					Manifest manifest = jar.getManifest();
-					String assemblerClass = null;
-
-					if (manifest != null) {
-						assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
-						if (assemblerClass == null) {
-							assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+						int startIndex = id.indexOf("_");
+						if (startIndex < 0) {
+							continue;
+						}
+						String name = id.substring(startIndex + 1);
+						if (name.length() < 5 || !name.endsWith(".jar")) {
+							continue;
 						}
-					}
-					if (assemblerClass != null) {
-						classes = assemblerClass.split(",");
-					}
-				} catch (IOException ignored) {
-					// we simply show no entries here
-				}
-
-				// show every entry class that can be loaded later on.
-				for (String clazz : classes) {
-					clazz = clazz.trim();
 
-					PackagedProgram program = null;
-					try {
-						program = new PackagedProgram(f, clazz, new String[0]);
-					} catch (Exception ignored) {
-						// ignore jar files which throw an error upon creating a PackagedProgram
-					}
-					if (program != null) {
 						gen.writeStartObject();
-						gen.writeStringField("name", clazz);
-						String desc = program.getDescription();
-						gen.writeStringField("description", desc == null ? "No description provided" : desc);
+						gen.writeStringField("id", id);
+						gen.writeStringField("name", name);
+						gen.writeNumberField("uploaded", f.lastModified());
+						gen.writeArrayFieldStart("entry");
+
+						String[] classes = new String[0];
+						try {
+							JarFile jar = new JarFile(f);
+							Manifest manifest = jar.getManifest();
+							String assemblerClass = null;
+
+							if (manifest != null) {
+								assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+								if (assemblerClass == null) {
+									assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+								}
+							}
+							if (assemblerClass != null) {
+								classes = assemblerClass.split(",");
+							}
+						} catch (IOException ignored) {
+							// we simply show no entries here
+						}
+
+						// show every entry class that can be loaded later on.
+						for (String clazz : classes) {
+							clazz = clazz.trim();
+
+							PackagedProgram program = null;
+							try {
+								program = new PackagedProgram(f, clazz, new String[0]);
+							} catch (Exception ignored) {
+								// ignore jar files which throw an error upon creating a PackagedProgram
+							}
+							if (program != null) {
+								gen.writeStartObject();
+								gen.writeStringField("name", clazz);
+								String desc = program.getDescription();
+								gen.writeStringField("description", desc == null ? "No description provided" : desc);
+								gen.writeEndObject();
+							}
+						}
+						gen.writeEndArray();
 						gen.writeEndObject();
 					}
+					gen.writeEndArray();
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
 				}
-				gen.writeEndArray();
-				gen.writeEndObject();
-			}
-			gen.writeEndArray();
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch jar list: " + e.getMessage(), e);
-		}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to fetch jar list.", e);
+				}
+			},
+			executor);
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b239160..b117b3d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -27,6 +28,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * This handler handles requests to fetch plan for a jar.
@@ -35,8 +38,8 @@ public class JarPlanHandler extends JarActionHandler {
 
 	static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan";
 
-	public JarPlanHandler(File jarDirectory) {
-		super(jarDirectory);
+	public JarPlanHandler(Executor executor, File jarDirectory) {
+		super(executor, jarDirectory);
 	}
 
 	@Override
@@ -45,21 +48,25 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
-			JobGraph graph = getJobGraphAndClassLoader(config).f0;
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			gen.writeFieldName("plan");
-			gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			return sendError(e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+					JobGraph graph = getJobGraphAndClassLoader(config).f0;
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					gen.writeFieldName("plan");
+					gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 12ffa4f..7ada0b4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
@@ -33,6 +34,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * This handler handles requests to fetch plan for a jar.
@@ -44,8 +47,8 @@ public class JarRunHandler extends JarActionHandler {
 	private final Time timeout;
 	private final Configuration clientConfig;
 
-	public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
-		super(jarDirectory);
+	public JarRunHandler(Executor executor, File jarDirectory, Time timeout, Configuration clientConfig) {
+		super(executor, jarDirectory);
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.clientConfig = Preconditions.checkNotNull(clientConfig);
 	}
@@ -56,31 +59,35 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
-			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+					Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 
-			try {
-				JobClient.submitJobDetached(
-					jobManagerGateway,
-					clientConfig,
-					graph.f0,
-					timeout,
-					graph.f1);
-			} catch (JobExecutionException e) {
-				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
-			}
+					try {
+						JobClient.submitJobDetached(
+							jobManagerGateway,
+							clientConfig,
+							graph.f0,
+							timeout,
+							graph.f1);
+					} catch (JobExecutionException e) {
+						throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
+					}
 
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			gen.writeStringField("jobid", graph.f0.getJobID().toString());
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		} catch (Exception e) {
-			return sendError(e);
-		}
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					gen.writeStringField("jobid", graph.f0.getJobID().toString());
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
+				} catch (Exception e) {
+					throw new FlinkFutureException("Could not run the jar.", e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 705c321..61b3f58 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import java.io.File;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handles requests for uploading of jars.
@@ -33,7 +35,8 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarUploadHandler(File jarDir) {
+	public JarUploadHandler(Executor executor, File jarDir) {
+		super(executor);
 		this.jarDir = jarDir;
 	}
 
@@ -43,34 +46,38 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(
+	public CompletableFuture<String> handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) throws Exception {
+			JobManagerGateway jobManagerGateway) {
 
 		String tempFilePath = queryParams.get("filepath");
 		String filename = queryParams.get("filename");
 
-		File tempFile;
-		if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
-			if (!tempFile.getName().endsWith(".jar")) {
-				//noinspection ResultOfMethodCallIgnored
-				tempFile.delete();
-				return "{\"error\": \"Only Jar files are allowed.\"}";
-			}
+		return CompletableFuture.supplyAsync(
+			() -> {
+				File tempFile;
+				if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
+					if (!tempFile.getName().endsWith(".jar")) {
+						//noinspection ResultOfMethodCallIgnored
+						tempFile.delete();
+						return "{\"error\": \"Only Jar files are allowed.\"}";
+					}
 
-			String filenameWithUUID = UUID.randomUUID() + "_" + filename;
-			File newFile = new File(jarDir, filenameWithUUID);
-			if (tempFile.renameTo(newFile)) {
-				// all went well
-				return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
-			}
-			else {
-				//noinspection ResultOfMethodCallIgnored
-				tempFile.delete();
-			}
-		}
+					String filenameWithUUID = UUID.randomUUID() + "_" + filename;
+					File newFile = new File(jarDir, filenameWithUUID);
+					if (tempFile.renameTo(newFile)) {
+						// all went well
+						return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
+					}
+					else {
+						//noinspection ResultOfMethodCallIgnored
+						tempFile.delete();
+					}
+				}
 
-		return "{\"error\": \"Failed to upload the file.\"}";
+				return "{\"error\": \"Failed to upload the file.\"}";
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 163e583..4dede3a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the aggregated user accumulators of a job.
@@ -39,8 +42,8 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
 
-	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -49,8 +52,16 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobAccumulatorsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobAccumulatorsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job accumulators json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**


[2/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 513dc08..1a7d868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the CANCEL request.
@@ -36,7 +39,8 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public JobCancellationHandler(Time timeout) {
+	public JobCancellationHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
@@ -46,19 +50,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManagerGateway != null) {
-				jobManagerGateway.cancelJob(jobId, timeout);
-				return "{}";
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to cancel the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+					if (jobManagerGateway != null) {
+						jobManagerGateway.cancelJob(jobId, timeout);
+						return "{}";
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 9b474aa..4e41447 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(
+		public CompletableFuture<FullHttpResponse> handleRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
-				JobManagerGateway jobManagerGateway) throws Exception {
+				JobManagerGateway jobManagerGateway) {
 
-			try {
-				if (jobManagerGateway != null) {
-					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-					final Optional<AccessExecutionGraph> optGraph;
+			if (jobManagerGateway != null) {
+				JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+				final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
 
-					try {
-						optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
-					}
+				graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
 
-					final AccessExecutionGraph graph = optGraph.orElseThrow(
-						() -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+				return graphFuture.thenApplyAsync(
+					(Optional<AccessExecutionGraph> optGraph) -> {
+						final AccessExecutionGraph graph = optGraph.orElseThrow(
+							() -> new FlinkFutureException(
+								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
 
-					CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-					if (coord == null) {
-						throw new Exception("Cannot find CheckpointCoordinator for job.");
-					}
+						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+						if (coord == null) {
+							throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+						}
 
-					String targetDirectory = pathParams.get("targetDirectory");
-					if (targetDirectory == null) {
-						if (defaultSavepointDirectory == null) {
-							throw new IllegalStateException("No savepoint directory configured. " +
+						String targetDirectory = pathParams.get("targetDirectory");
+						if (targetDirectory == null) {
+							if (defaultSavepointDirectory == null) {
+								throw new IllegalStateException("No savepoint directory configured. " +
 									"You can either specify a directory when triggering this savepoint or " +
 									"configure a cluster-wide default via key '" +
 									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-						} else {
-							targetDirectory = defaultSavepointDirectory;
+							} else {
+								targetDirectory = defaultSavepointDirectory;
+							}
 						}
-					}
 
-					return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
-				} else {
-					throw new Exception("No connection to the leading JobManager.");
-				}
-			} catch (Exception e) {
-				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+						try {
+							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+						}
+					}, executor);
+			} else {
+				return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
 			}
 		}
 
@@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-			try {
-				if (jobManagerGateway != null) {
-					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-					long requestId = Long.parseLong(pathParams.get("requestId"));
+		public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+			JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+			long requestId = Long.parseLong(pathParams.get("requestId"));
 
-					synchronized (lock) {
-						Object result = completed.remove(requestId);
-
-						if (result != null) {
-							// Add to recent history
-							recentlyCompleted.add(new Tuple2<>(requestId, result));
-							if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
-								recentlyCompleted.remove();
-							}
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						synchronized (lock) {
+							Object result = completed.remove(requestId);
+
+							if (result != null) {
+								// Add to recent history
+								recentlyCompleted.add(new Tuple2<>(requestId, result));
+								if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+									recentlyCompleted.remove();
+								}
 
-							if (result.getClass() == String.class) {
-								String savepointPath = (String) result;
-								return createSuccessResponse(requestId, savepointPath);
-							} else {
-								Throwable cause = (Throwable) result;
-								return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
-							}
-						} else {
-							// Check in-progress
-							Long inProgressRequestId = inProgress.get(jobId);
-							if (inProgressRequestId != null) {
-								// Sanity check
-								if (inProgressRequestId == requestId) {
-									return createInProgressResponse(requestId);
+								if (result.getClass() == String.class) {
+									String savepointPath = (String) result;
+									return createSuccessResponse(requestId, savepointPath);
 								} else {
-									String msg = "Request ID does not belong to JobID";
-									return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+									Throwable cause = (Throwable) result;
+									return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
 								}
-							}
-
-							// Check recent history
-							for (Tuple2<Long, Object> recent : recentlyCompleted) {
-								if (recent.f0 == requestId) {
-									if (recent.f1.getClass() == String.class) {
-										String savepointPath = (String) recent.f1;
-										return createSuccessResponse(requestId, savepointPath);
+							} else {
+								// Check in-progress
+								Long inProgressRequestId = inProgress.get(jobId);
+								if (inProgressRequestId != null) {
+									// Sanity check
+									if (inProgressRequestId == requestId) {
+										return createInProgressResponse(requestId);
 									} else {
-										Throwable cause = (Throwable) recent.f1;
-										return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+										String msg = "Request ID does not belong to JobID";
+										return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
 									}
 								}
-							}
 
-							return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+								// Check recent history
+								for (Tuple2<Long, Object> recent : recentlyCompleted) {
+									if (recent.f0 == requestId) {
+										if (recent.f1.getClass() == String.class) {
+											String savepointPath = (String) recent.f1;
+											return createSuccessResponse(requestId, savepointPath);
+										} else {
+											Throwable cause = (Throwable) recent.f1;
+											return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+										}
+									}
+								}
+
+								return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+							}
 						}
+					} catch (Exception e) {
+						throw new FlinkFutureException("Could not handle in progress request.", e);
 					}
-				} else {
-					throw new Exception("No connection to the leading JobManager.");
-				}
-			} catch (Exception e) {
-				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
-			}
+				});
 		}
 
 		private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 72cf8b7..0b15b37 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -39,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
 
-	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -49,8 +52,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobConfigJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write job config json.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 87ac7c3..8a50f87 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns details about a job. This includes:
@@ -57,8 +60,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -68,8 +71,16 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobDetailsJson(graph, fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobDetailsJson(graph, fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index e31299b..6ffd443 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the configuration of a job.
@@ -45,8 +48,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
-	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -55,8 +58,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobExceptionsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobExceptionsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job exceptions json.", e);
+				}
+			},
+			executor
+		);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index e2437e6..cb6d8c0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Returns the Job Manager's configuration.
@@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
 	private final Configuration config;
 
-	public JobManagerConfigHandler(Configuration config) {
+	public JobManagerConfigHandler(Executor executor, Configuration config) {
+		super(executor);
 		this.config = config;
 	}
 
@@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-		gen.writeStartArray();
-		for (String key : config.keySet()) {
-			gen.writeStartObject();
-			gen.writeStringField("key", key);
+					gen.writeStartArray();
+					for (String key : config.keySet()) {
+						gen.writeStartObject();
+						gen.writeStringField("key", key);
 
-			// Mask key values which contain sensitive information
-			if (key.toLowerCase().contains("password")) {
-				String value = config.getString(key, null);
-				if (value != null) {
-					value = "******";
-				}
-				gen.writeStringField("value", value);
-			}
-			else {
-				gen.writeStringField("value", config.getString(key, null));
-			}
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
+						// Mask key values which contain sensitive information
+						if (key.toLowerCase().contains("password")) {
+							String value = config.getString(key, null);
+							if (value != null) {
+								value = "******";
+							}
+							gen.writeStringField("value", value);
+						} else {
+							gen.writeStringField("value", config.getString(key, null));
+						}
+						gen.writeEndObject();
+					}
+					gen.writeEndArray();
 
-		gen.close();
-		return writer.toString();
+					gen.close();
+					return writer.toString();
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write configuration.", e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index d17b6bb..b3a9dd5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the JSON program plan of a job graph.
@@ -35,8 +37,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
 
-	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -45,8 +47,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return graph.getJsonPlan();
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.completedFuture(graph.getJsonPlan());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 3526734..f63403f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the STOP request.
@@ -36,7 +39,8 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public JobStoppingHandler(Time timeout) {
+	public JobStoppingHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
@@ -46,19 +50,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManagerGateway != null) {
-				jobManagerGateway.stopJob(jobId, timeout);
-				return "{}";
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to stop the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+					if (jobManagerGateway != null) {
+						jobManagerGateway.stopJob(jobId, timeout);
+						return "{}";
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to stop the job with id: "  + pathParams.get("jobid") + '.', e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8e90dfc..9c613ff 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accummulators for a given vertex.
@@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 
 	private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
 
-	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexAccumulatorsJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexAccumulatorsJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index cde8ca9..963153f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +28,11 @@ import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import scala.Option;
 
@@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 
 	public JobVertexBackPressureHandler(
 			ExecutionGraphHolder executionGraphHolder,
+			Executor executor,
 			BackPressureStatsTracker backPressureStatsTracker,
 			int refreshInterval) {
 
-		super(executionGraphHolder);
+		super(executionGraphHolder, executor);
 		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
 		checkArgument(refreshInterval >= 0, "Negative timeout");
 		this.refreshInterval = refreshInterval;
@@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(
+	public CompletableFuture<String> handleRequest(
 			AccessExecutionJobVertex accessJobVertex,
-			Map<String, String> params) throws Exception {
+			Map<String, String> params) {
 		if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
-			return "";
+			return CompletableFuture.completedFuture("");
 		}
 		ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
 		try (StringWriter writer = new StringWriter();
@@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 			gen.writeEndObject();
 			gen.close();
 
-			return writer.toString();
+			return CompletableFuture.completedFuture(writer.toString());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7757fdd..bd1745c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, name, parallelism,
@@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write the vertex details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index a612782..0827720 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -41,6 +42,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, name, and the
@@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create TaskManager json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 079be8f..8ca785f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Base interface for all request handlers.
@@ -44,13 +44,8 @@ public interface RequestHandler {
 	 * @param jobManagerGateway to talk to the JobManager.
 	 *
 	 * @return The full http response.
-	 *
-	 * @throws Exception Handlers may forward exceptions. Exceptions of type
-	 *         {@link NotFoundException} will cause a HTTP 404
-	 *         response with the exception message, other exceptions will cause a HTTP 500 response
-	 *         with the exception stack trace.
 	 */
-	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
+	CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway);
 
 	/**
 	 * Returns an array of REST URL's under which this handler can be registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 28e9ddf..301b217 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler providing details about a single task execution attempt.
@@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 
 	public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
 
-	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder, fetcher);
+	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor, fetcher);
 	}
 
 	@Override
@@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
 		return handleRequest(vertex.getCurrentExecutionAttempt(), params);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 171277f..3c0d1d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job vertex (defined
@@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
 
-	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 	}
 
 	@Override
-	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-		return createAttemptAccumulatorsJson(execAttempt);
+	public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createAttemptAccumulatorsJson(execAttempt);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create accumulator json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 37c0e50..ad836df 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -40,6 +41,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
 
@@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 
 	private final MetricFetcher fetcher;
 
-	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 	}
 
 	@Override
-	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-		return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create attempt details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 64bdfb4..8142548 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accumulators for all subtasks of job vertex.
@@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 
 	private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
 
-	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createSubtasksAccumulatorsJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createSubtasksAccumulatorsJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index ea88587..d766206 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the state transition timestamps for all subtasks, plus their
@@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
 	private static final String SUBTASK_TIMES_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasktimes";
 
-	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createSubtaskTimesJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createSubtaskTimesJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write subtask time json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a8ab7a3..9f83ed0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
 
@@ -53,7 +55,8 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 	private final MetricFetcher fetcher;
 
-	public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
+	public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+		super(executor);
 		this.timeout = requireNonNull(timeout);
 		this.fetcher = fetcher;
 	}
@@ -64,134 +67,139 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			if (jobManagerGateway != null) {
-				// whether one task manager's metrics are requested, or all task manager, we
-				// return them in an array. This avoids unnecessary code complexity.
-				// If only one task manager is requested, we only fetch one task manager metrics.
-				final List<Instance> instances = new ArrayList<>();
-				if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-					try {
-						InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-						CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
-						Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-						instance.ifPresent(instances::add);
-					}
-					// this means the id string was invalid. Keep the list empty.
-					catch (IllegalArgumentException e){
-						// do nothing.
-					}
-				} else {
-					CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-					Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-					instances.addAll(tmInstances);
-				}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			// whether one task manager's metrics are requested, or all task manager, we
+			// return them in an array. This avoids unnecessary code complexity.
+			// If only one task manager is requested, we only fetch one task manager metrics.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+				return tmInstanceFuture.thenApplyAsync(
+					(Optional<Instance> optTaskManager) -> {
+						try {
+							return writeTaskManagersJson(
+								optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+								pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			} else {
+				CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+				return tmInstancesFuture.thenApplyAsync(
+					(Collection<Instance> taskManagers) -> {
+						try {
+							return writeTaskManagersJson(taskManagers, pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			}
+		}
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+		}
+	}
 
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-				gen.writeArrayFieldStart("taskmanagers");
-
-				for (Instance instance : instances) {
-					gen.writeStartObject();
-					gen.writeStringField("id", instance.getId().toString());
-					gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
-					gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
-					gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
-					gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
-					gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
-					gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
-					gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
-					gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
-					gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
-
-					// only send metrics when only one task manager requests them.
-					if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-						fetcher.update();
-						MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-						if (metrics != null) {
-							gen.writeObjectFieldStart("metrics");
-							long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-							long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-							long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-							gen.writeNumberField("heapCommitted", heapCommitted);
-							gen.writeNumberField("heapUsed", heapUsed);
-							gen.writeNumberField("heapMax", heapTotal);
-
-							long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-							long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-							long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-							gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-							gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-							gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-							gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-							gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-							gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-							long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-							long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-							long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-							gen.writeNumberField("directCount", directCount);
-							gen.writeNumberField("directUsed", directUsed);
-							gen.writeNumberField("directMax", directMax);
-
-							long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-							long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-							long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-							gen.writeNumberField("mappedCount", mappedCount);
-							gen.writeNumberField("mappedUsed", mappedUsed);
-							gen.writeNumberField("mappedMax", mappedMax);
-
-							long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-							long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-							gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-							gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-							gen.writeArrayFieldStart("garbageCollectors");
-
-							for (String gcName : metrics.garbageCollectorNames) {
-								String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-								String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-								if (count != null  && time != null) {
-									gen.writeStartObject();
-									gen.writeStringField("name", gcName);
-									gen.writeNumberField("count", Long.valueOf(count));
-									gen.writeNumberField("time", Long.valueOf(time));
-									gen.writeEndObject();
-								}
-							}
-
-							gen.writeEndArray();
+	private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeArrayFieldStart("taskmanagers");
+
+		for (Instance instance : instances) {
+			gen.writeStartObject();
+			gen.writeStringField("id", instance.getId().toString());
+			gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+			gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+			gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+			gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+			gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+			gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+			gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+			gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+			gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+			// only send metrics when only one task manager requests them.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				fetcher.update();
+				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+				if (metrics != null) {
+					gen.writeObjectFieldStart("metrics");
+					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+					long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+					long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+					gen.writeNumberField("heapCommitted", heapCommitted);
+					gen.writeNumberField("heapUsed", heapUsed);
+					gen.writeNumberField("heapMax", heapTotal);
+
+					long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+					long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+					long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+					gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+					gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+					gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+					gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+					gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+					gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+					long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+					long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+					long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+					gen.writeNumberField("directCount", directCount);
+					gen.writeNumberField("directUsed", directUsed);
+					gen.writeNumberField("directMax", directMax);
+
+					long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+					long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+					long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+					gen.writeNumberField("mappedCount", mappedCount);
+					gen.writeNumberField("mappedUsed", mappedUsed);
+					gen.writeNumberField("mappedMax", mappedMax);
+
+					long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+					long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+					gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+					gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+					gen.writeArrayFieldStart("garbageCollectors");
+
+					for (String gcName : metrics.garbageCollectorNames) {
+						String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+						String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+						if (count != null  && time != null) {
+							gen.writeStartObject();
+							gen.writeStringField("name", gcName);
+							gen.writeNumberField("count", Long.valueOf(count));
+							gen.writeNumberField("time", Long.valueOf(time));
 							gen.writeEndObject();
 						}
 					}
 
+					gen.writeEndArray();
 					gen.writeEndObject();
 				}
-
-				gen.writeEndArray();
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
 			}
+
+			gen.writeEndObject();
 		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all task managers: " + e.getMessage(), e);
-		}
+
+		gen.writeEndArray();
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index d4c9b2a..3affd7c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -34,6 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns a job's snapshotting settings.
@@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 
 	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
 
-	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createCheckpointConfigJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint config json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 664744b..96cc3e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -40,6 +41,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns checkpoint stats for a single job vertex.
@@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
-		super(executionGraphHolder);
+	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
 		this.cache = cache;
 	}
 
@@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		long checkpointId = parseCheckpointId(params);
-		if (checkpointId == -1) {
-			return "{}";
-		}
-
-		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
-		if (snapshot == null) {
-			return "{}";
-		}
-
-		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
-		if (checkpoint != null) {
-			cache.tryAdd(checkpoint);
-		} else {
-			checkpoint = cache.tryGet(checkpointId);
-
-			if (checkpoint == null) {
-				return "{}";
-			}
-		}
-
-		return createCheckpointDetailsJson(checkpoint);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				long checkpointId = parseCheckpointId(params);
+				if (checkpointId == -1) {
+					return "{}";
+				}
+
+				CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+				if (snapshot == null) {
+					return "{}";
+				}
+
+				AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+				if (checkpoint != null) {
+					cache.tryAdd(checkpoint);
+				} else {
+					checkpoint = cache.tryGet(checkpointId);
+
+					if (checkpoint == null) {
+						return "{}";
+					}
+				}
+
+				try {
+					return createCheckpointDetailsJson(checkpoint);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d116c56..045248b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -43,6 +44,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
-		super(executionGraphHolder);
+	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
 		this.cache = checkNotNull(cache);
 	}
 
@@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 	}
 
 	@Override
-	public String handleJsonRequest(
-		Map<String, String> pathParams,
-		Map<String, String> queryParams,
-		JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
 		return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
 		long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
 		if (checkpointId == -1) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
 		if (vertexId == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
 		if (snapshot == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 			checkpoint = cache.tryGet(checkpointId);
 
 			if (checkpoint == null) {
-				return "{}";
+				return CompletableFuture.completedFuture("{}");
 			}
 		}
 
 		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
 		if (taskStats == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
-		return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
+		try {
+			return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index a86c5fd..a60aee0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -43,6 +44,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns checkpoint statistics for a job.
@@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
 
-	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createCheckpointStatsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointStatsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index b95f2c4..cf286ce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
@@ -43,17 +46,27 @@ import java.util.Map;
 public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
 	private final MetricFetcher fetcher;
 
-	public AbstractMetricsHandler(MetricFetcher fetcher) {
+	public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor);
 		this.fetcher = Preconditions.checkNotNull(fetcher);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		fetcher.update();
-		String requestedMetricsList = queryParams.get("get");
-		return requestedMetricsList != null
-			? getMetricsValues(pathParams, requestedMetricsList)
-			: getAvailableMetricsList(pathParams);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				fetcher.update();
+				String requestedMetricsList = queryParams.get("get");
+				try {
+					return requestedMetricsList != null
+						? getMetricsValues(pathParams, requestedMetricsList)
+						: getAvailableMetricsList(pathParams);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not retrieve metrics.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7252d8a..2bd6683 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
 
 	private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
 
-	public JobManagerMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index a193457..e5e2500 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_JOB_ID = "jobid";
 	private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
 
-	public JobMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index e893da4..1d2cd84 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_VERTEX_ID = "vertexid";
 	private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
 
-	public JobVertexMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override