You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:00:18 UTC

[2/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 513dc08..1a7d868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the CANCEL request.
@@ -36,7 +39,8 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public JobCancellationHandler(Time timeout) {
+	public JobCancellationHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
@@ -46,19 +50,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManagerGateway != null) {
-				jobManagerGateway.cancelJob(jobId, timeout);
-				return "{}";
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to cancel the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+					if (jobManagerGateway != null) {
+						jobManagerGateway.cancelJob(jobId, timeout);
+						return "{}";
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 9b474aa..4e41447 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(
+		public CompletableFuture<FullHttpResponse> handleRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
-				JobManagerGateway jobManagerGateway) throws Exception {
+				JobManagerGateway jobManagerGateway) {
 
-			try {
-				if (jobManagerGateway != null) {
-					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-					final Optional<AccessExecutionGraph> optGraph;
+			if (jobManagerGateway != null) {
+				JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+				final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
 
-					try {
-						optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
-					}
+				graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
 
-					final AccessExecutionGraph graph = optGraph.orElseThrow(
-						() -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+				return graphFuture.thenApplyAsync(
+					(Optional<AccessExecutionGraph> optGraph) -> {
+						final AccessExecutionGraph graph = optGraph.orElseThrow(
+							() -> new FlinkFutureException(
+								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
 
-					CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-					if (coord == null) {
-						throw new Exception("Cannot find CheckpointCoordinator for job.");
-					}
+						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+						if (coord == null) {
+							throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+						}
 
-					String targetDirectory = pathParams.get("targetDirectory");
-					if (targetDirectory == null) {
-						if (defaultSavepointDirectory == null) {
-							throw new IllegalStateException("No savepoint directory configured. " +
+						String targetDirectory = pathParams.get("targetDirectory");
+						if (targetDirectory == null) {
+							if (defaultSavepointDirectory == null) {
+								throw new IllegalStateException("No savepoint directory configured. " +
 									"You can either specify a directory when triggering this savepoint or " +
 									"configure a cluster-wide default via key '" +
 									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-						} else {
-							targetDirectory = defaultSavepointDirectory;
+							} else {
+								targetDirectory = defaultSavepointDirectory;
+							}
 						}
-					}
 
-					return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
-				} else {
-					throw new Exception("No connection to the leading JobManager.");
-				}
-			} catch (Exception e) {
-				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+						try {
+							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+						}
+					}, executor);
+			} else {
+				return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
 			}
 		}
 
@@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-			try {
-				if (jobManagerGateway != null) {
-					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-					long requestId = Long.parseLong(pathParams.get("requestId"));
+		public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+			JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+			long requestId = Long.parseLong(pathParams.get("requestId"));
 
-					synchronized (lock) {
-						Object result = completed.remove(requestId);
-
-						if (result != null) {
-							// Add to recent history
-							recentlyCompleted.add(new Tuple2<>(requestId, result));
-							if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
-								recentlyCompleted.remove();
-							}
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						synchronized (lock) {
+							Object result = completed.remove(requestId);
+
+							if (result != null) {
+								// Add to recent history
+								recentlyCompleted.add(new Tuple2<>(requestId, result));
+								if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+									recentlyCompleted.remove();
+								}
 
-							if (result.getClass() == String.class) {
-								String savepointPath = (String) result;
-								return createSuccessResponse(requestId, savepointPath);
-							} else {
-								Throwable cause = (Throwable) result;
-								return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
-							}
-						} else {
-							// Check in-progress
-							Long inProgressRequestId = inProgress.get(jobId);
-							if (inProgressRequestId != null) {
-								// Sanity check
-								if (inProgressRequestId == requestId) {
-									return createInProgressResponse(requestId);
+								if (result.getClass() == String.class) {
+									String savepointPath = (String) result;
+									return createSuccessResponse(requestId, savepointPath);
 								} else {
-									String msg = "Request ID does not belong to JobID";
-									return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+									Throwable cause = (Throwable) result;
+									return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
 								}
-							}
-
-							// Check recent history
-							for (Tuple2<Long, Object> recent : recentlyCompleted) {
-								if (recent.f0 == requestId) {
-									if (recent.f1.getClass() == String.class) {
-										String savepointPath = (String) recent.f1;
-										return createSuccessResponse(requestId, savepointPath);
+							} else {
+								// Check in-progress
+								Long inProgressRequestId = inProgress.get(jobId);
+								if (inProgressRequestId != null) {
+									// Sanity check
+									if (inProgressRequestId == requestId) {
+										return createInProgressResponse(requestId);
 									} else {
-										Throwable cause = (Throwable) recent.f1;
-										return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+										String msg = "Request ID does not belong to JobID";
+										return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
 									}
 								}
-							}
 
-							return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+								// Check recent history
+								for (Tuple2<Long, Object> recent : recentlyCompleted) {
+									if (recent.f0 == requestId) {
+										if (recent.f1.getClass() == String.class) {
+											String savepointPath = (String) recent.f1;
+											return createSuccessResponse(requestId, savepointPath);
+										} else {
+											Throwable cause = (Throwable) recent.f1;
+											return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+										}
+									}
+								}
+
+								return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+							}
 						}
+					} catch (Exception e) {
+						throw new FlinkFutureException("Could not handle in progress request.", e);
 					}
-				} else {
-					throw new Exception("No connection to the leading JobManager.");
-				}
-			} catch (Exception e) {
-				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
-			}
+				});
 		}
 
 		private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 72cf8b7..0b15b37 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -39,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
 
-	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -49,8 +52,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobConfigJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write job config json.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 87ac7c3..8a50f87 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns details about a job. This includes:
@@ -57,8 +60,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -68,8 +71,16 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobDetailsJson(graph, fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobDetailsJson(graph, fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index e31299b..6ffd443 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the configuration of a job.
@@ -45,8 +48,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
-	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -55,8 +58,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobExceptionsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobExceptionsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job exceptions json.", e);
+				}
+			},
+			executor
+		);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index e2437e6..cb6d8c0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Returns the Job Manager's configuration.
@@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
 	private final Configuration config;
 
-	public JobManagerConfigHandler(Configuration config) {
+	public JobManagerConfigHandler(Executor executor, Configuration config) {
+		super(executor);
 		this.config = config;
 	}
 
@@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-		gen.writeStartArray();
-		for (String key : config.keySet()) {
-			gen.writeStartObject();
-			gen.writeStringField("key", key);
+					gen.writeStartArray();
+					for (String key : config.keySet()) {
+						gen.writeStartObject();
+						gen.writeStringField("key", key);
 
-			// Mask key values which contain sensitive information
-			if (key.toLowerCase().contains("password")) {
-				String value = config.getString(key, null);
-				if (value != null) {
-					value = "******";
-				}
-				gen.writeStringField("value", value);
-			}
-			else {
-				gen.writeStringField("value", config.getString(key, null));
-			}
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
+						// Mask key values which contain sensitive information
+						if (key.toLowerCase().contains("password")) {
+							String value = config.getString(key, null);
+							if (value != null) {
+								value = "******";
+							}
+							gen.writeStringField("value", value);
+						} else {
+							gen.writeStringField("value", config.getString(key, null));
+						}
+						gen.writeEndObject();
+					}
+					gen.writeEndArray();
 
-		gen.close();
-		return writer.toString();
+					gen.close();
+					return writer.toString();
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write configuration.", e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index d17b6bb..b3a9dd5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the JSON program plan of a job graph.
@@ -35,8 +37,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
 
-	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -45,8 +47,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return graph.getJsonPlan();
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.completedFuture(graph.getJsonPlan());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 3526734..f63403f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the STOP request.
@@ -36,7 +39,8 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public JobStoppingHandler(Time timeout) {
+	public JobStoppingHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
@@ -46,19 +50,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManagerGateway != null) {
-				jobManagerGateway.stopJob(jobId, timeout);
-				return "{}";
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to stop the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+					if (jobManagerGateway != null) {
+						jobManagerGateway.stopJob(jobId, timeout);
+						return "{}";
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to stop the job with id: "  + pathParams.get("jobid") + '.', e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8e90dfc..9c613ff 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accummulators for a given vertex.
@@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 
 	private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
 
-	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexAccumulatorsJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexAccumulatorsJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index cde8ca9..963153f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +28,11 @@ import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import scala.Option;
 
@@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 
 	public JobVertexBackPressureHandler(
 			ExecutionGraphHolder executionGraphHolder,
+			Executor executor,
 			BackPressureStatsTracker backPressureStatsTracker,
 			int refreshInterval) {
 
-		super(executionGraphHolder);
+		super(executionGraphHolder, executor);
 		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
 		checkArgument(refreshInterval >= 0, "Negative timeout");
 		this.refreshInterval = refreshInterval;
@@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(
+	public CompletableFuture<String> handleRequest(
 			AccessExecutionJobVertex accessJobVertex,
-			Map<String, String> params) throws Exception {
+			Map<String, String> params) {
 		if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
-			return "";
+			return CompletableFuture.completedFuture("");
 		}
 		ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
 		try (StringWriter writer = new StringWriter();
@@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 			gen.writeEndObject();
 			gen.close();
 
-			return writer.toString();
+			return CompletableFuture.completedFuture(writer.toString());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7757fdd..bd1745c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, name, parallelism,
@@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write the vertex details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index a612782..0827720 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -41,6 +42,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, name, and the
@@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create TaskManager json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 079be8f..8ca785f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Base interface for all request handlers.
@@ -44,13 +44,8 @@ public interface RequestHandler {
 	 * @param jobManagerGateway to talk to the JobManager.
 	 *
 	 * @return The full http response.
-	 *
-	 * @throws Exception Handlers may forward exceptions. Exceptions of type
-	 *         {@link NotFoundException} will cause a HTTP 404
-	 *         response with the exception message, other exceptions will cause a HTTP 500 response
-	 *         with the exception stack trace.
 	 */
-	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
+	CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway);
 
 	/**
 	 * Returns an array of REST URL's under which this handler can be registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 28e9ddf..301b217 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler providing details about a single task execution attempt.
@@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 
 	public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
 
-	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder, fetcher);
+	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor, fetcher);
 	}
 
 	@Override
@@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
 		return handleRequest(vertex.getCurrentExecutionAttempt(), params);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 171277f..3c0d1d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job vertex (defined
@@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
 
-	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 	}
 
 	@Override
-	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-		return createAttemptAccumulatorsJson(execAttempt);
+	public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createAttemptAccumulatorsJson(execAttempt);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create accumulator json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 37c0e50..ad836df 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -40,6 +41,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
 
@@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 
 	private final MetricFetcher fetcher;
 
-	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
-		super(executionGraphHolder);
+	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}
 
@@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 	}
 
 	@Override
-	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-		return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+	public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create attempt details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 64bdfb4..8142548 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accumulators for all subtasks of job vertex.
@@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 
 	private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
 
-	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createSubtasksAccumulatorsJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createSubtasksAccumulatorsJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index ea88587..d766206 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the state transition timestamps for all subtasks, plus their
@@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
 	private static final String SUBTASK_TIMES_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasktimes";
 
-	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		return createSubtaskTimesJson(jobVertex);
+	public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createSubtaskTimesJson(jobVertex);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write subtask time json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a8ab7a3..9f83ed0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
 
@@ -53,7 +55,8 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 	private final MetricFetcher fetcher;
 
-	public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
+	public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+		super(executor);
 		this.timeout = requireNonNull(timeout);
 		this.fetcher = fetcher;
 	}
@@ -64,134 +67,139 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			if (jobManagerGateway != null) {
-				// whether one task manager's metrics are requested, or all task manager, we
-				// return them in an array. This avoids unnecessary code complexity.
-				// If only one task manager is requested, we only fetch one task manager metrics.
-				final List<Instance> instances = new ArrayList<>();
-				if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-					try {
-						InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-						CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
-						Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-						instance.ifPresent(instances::add);
-					}
-					// this means the id string was invalid. Keep the list empty.
-					catch (IllegalArgumentException e){
-						// do nothing.
-					}
-				} else {
-					CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-					Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-					instances.addAll(tmInstances);
-				}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			// whether one task manager's metrics are requested, or all task manager, we
+			// return them in an array. This avoids unnecessary code complexity.
+			// If only one task manager is requested, we only fetch one task manager metrics.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+				return tmInstanceFuture.thenApplyAsync(
+					(Optional<Instance> optTaskManager) -> {
+						try {
+							return writeTaskManagersJson(
+								optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+								pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			} else {
+				CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+				return tmInstancesFuture.thenApplyAsync(
+					(Collection<Instance> taskManagers) -> {
+						try {
+							return writeTaskManagersJson(taskManagers, pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			}
+		}
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+		}
+	}
 
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-				gen.writeArrayFieldStart("taskmanagers");
-
-				for (Instance instance : instances) {
-					gen.writeStartObject();
-					gen.writeStringField("id", instance.getId().toString());
-					gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
-					gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
-					gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
-					gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
-					gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
-					gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
-					gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
-					gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
-					gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
-
-					// only send metrics when only one task manager requests them.
-					if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-						fetcher.update();
-						MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-						if (metrics != null) {
-							gen.writeObjectFieldStart("metrics");
-							long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-							long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-							long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-							gen.writeNumberField("heapCommitted", heapCommitted);
-							gen.writeNumberField("heapUsed", heapUsed);
-							gen.writeNumberField("heapMax", heapTotal);
-
-							long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-							long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-							long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-							gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-							gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-							gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-							gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-							gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-							gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-							long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-							long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-							long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-							gen.writeNumberField("directCount", directCount);
-							gen.writeNumberField("directUsed", directUsed);
-							gen.writeNumberField("directMax", directMax);
-
-							long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-							long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-							long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-							gen.writeNumberField("mappedCount", mappedCount);
-							gen.writeNumberField("mappedUsed", mappedUsed);
-							gen.writeNumberField("mappedMax", mappedMax);
-
-							long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-							long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-							gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-							gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-							gen.writeArrayFieldStart("garbageCollectors");
-
-							for (String gcName : metrics.garbageCollectorNames) {
-								String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-								String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-								if (count != null  && time != null) {
-									gen.writeStartObject();
-									gen.writeStringField("name", gcName);
-									gen.writeNumberField("count", Long.valueOf(count));
-									gen.writeNumberField("time", Long.valueOf(time));
-									gen.writeEndObject();
-								}
-							}
-
-							gen.writeEndArray();
+	private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeArrayFieldStart("taskmanagers");
+
+		for (Instance instance : instances) {
+			gen.writeStartObject();
+			gen.writeStringField("id", instance.getId().toString());
+			gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+			gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+			gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+			gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+			gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+			gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+			gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+			gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+			gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+			// only send metrics when only one task manager requests them.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				fetcher.update();
+				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+				if (metrics != null) {
+					gen.writeObjectFieldStart("metrics");
+					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+					long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+					long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+					gen.writeNumberField("heapCommitted", heapCommitted);
+					gen.writeNumberField("heapUsed", heapUsed);
+					gen.writeNumberField("heapMax", heapTotal);
+
+					long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+					long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+					long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+					gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+					gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+					gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+					gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+					gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+					gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+					long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+					long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+					long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+					gen.writeNumberField("directCount", directCount);
+					gen.writeNumberField("directUsed", directUsed);
+					gen.writeNumberField("directMax", directMax);
+
+					long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+					long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+					long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+					gen.writeNumberField("mappedCount", mappedCount);
+					gen.writeNumberField("mappedUsed", mappedUsed);
+					gen.writeNumberField("mappedMax", mappedMax);
+
+					long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+					long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+					gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+					gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+					gen.writeArrayFieldStart("garbageCollectors");
+
+					for (String gcName : metrics.garbageCollectorNames) {
+						String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+						String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+						if (count != null  && time != null) {
+							gen.writeStartObject();
+							gen.writeStringField("name", gcName);
+							gen.writeNumberField("count", Long.valueOf(count));
+							gen.writeNumberField("time", Long.valueOf(time));
 							gen.writeEndObject();
 						}
 					}
 
+					gen.writeEndArray();
 					gen.writeEndObject();
 				}
-
-				gen.writeEndArray();
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
 			}
+
+			gen.writeEndObject();
 		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all task managers: " + e.getMessage(), e);
-		}
+
+		gen.writeEndArray();
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index d4c9b2a..3affd7c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -34,6 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns a job's snapshotting settings.
@@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 
 	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
 
-	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createCheckpointConfigJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint config json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 664744b..96cc3e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -40,6 +41,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns checkpoint stats for a single job vertex.
@@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
-		super(executionGraphHolder);
+	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
 		this.cache = cache;
 	}
 
@@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		long checkpointId = parseCheckpointId(params);
-		if (checkpointId == -1) {
-			return "{}";
-		}
-
-		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
-		if (snapshot == null) {
-			return "{}";
-		}
-
-		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
-		if (checkpoint != null) {
-			cache.tryAdd(checkpoint);
-		} else {
-			checkpoint = cache.tryGet(checkpointId);
-
-			if (checkpoint == null) {
-				return "{}";
-			}
-		}
-
-		return createCheckpointDetailsJson(checkpoint);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				long checkpointId = parseCheckpointId(params);
+				if (checkpointId == -1) {
+					return "{}";
+				}
+
+				CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+				if (snapshot == null) {
+					return "{}";
+				}
+
+				AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+				if (checkpoint != null) {
+					cache.tryAdd(checkpoint);
+				} else {
+					checkpoint = cache.tryGet(checkpointId);
+
+					if (checkpoint == null) {
+						return "{}";
+					}
+				}
+
+				try {
+					return createCheckpointDetailsJson(checkpoint);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint details json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d116c56..045248b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -43,6 +44,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
-		super(executionGraphHolder);
+	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
 		this.cache = checkNotNull(cache);
 	}
 
@@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 	}
 
 	@Override
-	public String handleJsonRequest(
-		Map<String, String> pathParams,
-		Map<String, String> queryParams,
-		JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
 		return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
 		long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
 		if (checkpointId == -1) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
 		if (vertexId == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
 		if (snapshot == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
 		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 			checkpoint = cache.tryGet(checkpointId);
 
 			if (checkpoint == null) {
-				return "{}";
+				return CompletableFuture.completedFuture("{}");
 			}
 		}
 
 		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
 		if (taskStats == null) {
-			return "{}";
+			return CompletableFuture.completedFuture("{}");
 		}
 
-		return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
+		try {
+			return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index a86c5fd..a60aee0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -43,6 +44,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns checkpoint statistics for a job.
@@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
 
-	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createCheckpointStatsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointStatsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index b95f2c4..cf286ce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
@@ -43,17 +46,27 @@ import java.util.Map;
 public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
 	private final MetricFetcher fetcher;
 
-	public AbstractMetricsHandler(MetricFetcher fetcher) {
+	public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor);
 		this.fetcher = Preconditions.checkNotNull(fetcher);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		fetcher.update();
-		String requestedMetricsList = queryParams.get("get");
-		return requestedMetricsList != null
-			? getMetricsValues(pathParams, requestedMetricsList)
-			: getAvailableMetricsList(pathParams);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				fetcher.update();
+				String requestedMetricsList = queryParams.get("get");
+				try {
+					return requestedMetricsList != null
+						? getMetricsValues(pathParams, requestedMetricsList)
+						: getAvailableMetricsList(pathParams);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not retrieve metrics.", e);
+				}
+			},
+			executor);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7252d8a..2bd6683 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
 
 	private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
 
-	public JobManagerMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index a193457..e5e2500 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_JOB_ID = "jobid";
 	private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
 
-	public JobMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index e893da4..1d2cd84 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_VERTEX_ID = "vertexid";
 	private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
 
-	public JobVertexMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override