You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:25 UTC

[15/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
deleted file mode 100644
index 1ec3f9c..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for most request handlers. The handlers must produce a JSON response.
- */
-public abstract class AbstractJsonRequestHandler implements RequestHandler {
-
-	private static final Charset ENCODING = Charset.forName("UTF-8");
-
-	protected final Executor executor;
-
-	protected AbstractJsonRequestHandler(Executor executor) {
-		this.executor = Preconditions.checkNotNull(executor);
-	}
-
-	@Override
-	public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-
-		return resultFuture.thenApplyAsync(
-			(String result) -> {
-				byte[] bytes = result.getBytes(ENCODING);
-
-				DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-				response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-				response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-				return response;
-			});
-	}
-
-	/**
-	 * Core method that handles the request and generates the response. The method needs to
-	 * respond with a valid JSON string. Exceptions may be thrown and will be handled.
-	 *
-	 * @param pathParams The map of REST path parameters, decoded by the router.
-	 * @param queryParams The map of query parameters.
-	 * @param jobManagerGateway to communicate with the JobManager.
-	 *
-	 * @return The JSON string that is the HTTP response.
-	 *
-	 * @throws Exception Handlers may forward exceptions. Exceptions of type
-	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
-	 *         response with the exception message, other exceptions will cause a HTTP 500 response
-	 *         with the exception stack trace.
-	 */
-	public abstract CompletableFuture<String> handleJsonRequest(
-			Map<String, String> pathParams,
-			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
deleted file mode 100644
index 1b20673..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific subtask execution attempt
- * (defined via the "attempt" parameter) of a specific subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
-
-	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
-		final String attemptNumberString = params.get("attempt");
-		if (attemptNumberString == null) {
-			return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
-		}
-
-		final int attempt;
-		try {
-			attempt = Integer.parseInt(attemptNumberString);
-		}
-		catch (NumberFormatException e) {
-			return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
-		}
-
-		final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
-		if (attempt == currentAttempt.getAttemptNumber()) {
-			return handleRequest(currentAttempt, params);
-		}
-		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
-			AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
-
-			if (exec != null) {
-				return handleRequest(exec, params);
-			} else {
-				return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
-					" has already been deleted."));
-			}
-		}
-		else {
-			return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
-		}
-	}
-
-	public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
deleted file mode 100644
index ab85034..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
-
-	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
-		final String subtaskNumberString = params.get("subtasknum");
-		if (subtaskNumberString == null) {
-			return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
-		}
-
-		final int subtask;
-		try {
-			subtask = Integer.parseInt(subtaskNumberString);
-		}
-		catch (NumberFormatException e) {
-			return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
-		}
-
-		if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
-			return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
-		}
-
-		final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
-		return handleRequest(vertex, params);
-	}
-
-	public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
deleted file mode 100644
index 17db2e8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.FlinkException;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Responder that returns the status of the Flink cluster, such as how many
- * TaskManagers are currently connected, and how many jobs are running.
- */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
-
-	private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
-
-	private static final String version = EnvironmentInformation.getVersion();
-
-	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
-
-	private final Time timeout;
-
-	public ClusterOverviewHandler(Executor executor, Time timeout) {
-		super(executor);
-		this.timeout = checkNotNull(timeout);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CLUSTER_OVERVIEW_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		// we need no parameters, get all requests
-		try {
-			if (jobManagerGateway != null) {
-				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
-
-				return overviewFuture.thenApplyAsync(
-					(StatusOverview overview) -> {
-						StringWriter writer = new StringWriter();
-						try {
-							JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-							gen.writeStartObject();
-							gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-							gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-							gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-							gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-							gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-							gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-							gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-							gen.writeStringField("flink-version", version);
-							if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-								gen.writeStringField("flink-commit", commitID);
-							}
-							gen.writeEndObject();
-
-							gen.close();
-							return writer.toString();
-						} catch (IOException exception) {
-							throw new FlinkFutureException("Could not write cluster overview.", exception);
-						}
-					},
-					executor);
-			} else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
deleted file mode 100644
index 34898e7..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-/**
- * Responder that returns a constant String.
- */
-@ChannelHandler.Sharable
-public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
-
-	private final byte[] encodedText;
-
-	public ConstantTextHandler(String text) {
-		this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET);
-	}
-
-	@Override
-	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		HttpResponse response = new DefaultFullHttpResponse(
-			HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
-
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
-		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
-
-		KeepAliveWrite.flush(ctx, routed.request(), response);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
deleted file mode 100644
index acf1cd0..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Responder that returns with a list of all JobIDs of jobs found at the target actor.
- * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
- * given the JobManager or Archive Actor Reference.
- */
-public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
-
-	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
-	private final Time timeout;
-
-	public CurrentJobIdsHandler(Executor executor, Time timeout) {
-		super(executor);
-		this.timeout = requireNonNull(timeout);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CURRENT_JOB_IDS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				// we need no parameters, get all requests
-				try {
-					if (jobManagerGateway != null) {
-						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-						StringWriter writer = new StringWriter();
-						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-						gen.writeStartObject();
-
-						gen.writeArrayFieldStart("jobs-running");
-						for (JobID jid : overview.getJobsRunningOrPending()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-finished");
-						for (JobID jid : overview.getJobsFinished()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-cancelled");
-						for (JobID jid : overview.getJobsCancelled()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-failed");
-						for (JobID jid : overview.getJobsFailed()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeEndObject();
-
-						gen.close();
-						return writer.toString();
-					}
-					else {
-						throw new Exception("No connection to the leading JobManager.");
-					}
-				}
-				catch (Exception e) {
-					throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
-				}
-			},
-			executor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
deleted file mode 100644
index a5b116c..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns a summary of the job status.
- */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
-
-	private static final String ALL_JOBS_REST_PATH = "/joboverview";
-	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
-	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
-
-	private final Time timeout;
-
-	private final boolean includeRunningJobs;
-	private final boolean includeFinishedJobs;
-
-	public CurrentJobsOverviewHandler(
-			Executor executor,
-			Time timeout,
-			boolean includeRunningJobs,
-			boolean includeFinishedJobs) {
-
-		super(executor);
-		this.timeout = checkNotNull(timeout);
-		this.includeRunningJobs = includeRunningJobs;
-		this.includeFinishedJobs = includeFinishedJobs;
-	}
-
-	@Override
-	public String[] getPaths() {
-		if (includeRunningJobs && includeFinishedJobs) {
-			return new String[]{ALL_JOBS_REST_PATH};
-		}
-		if (includeRunningJobs) {
-			return new String[]{RUNNING_JOBS_REST_PATH};
-		} else {
-			return new String[]{COMPLETED_JOBS_REST_PATH};
-		}
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		if (jobManagerGateway != null) {
-			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-
-			return jobDetailsFuture.thenApplyAsync(
-				(MultipleJobsDetails result) -> {
-					final long now = System.currentTimeMillis();
-
-					StringWriter writer = new StringWriter();
-					try {
-						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-						gen.writeStartObject();
-
-						if (includeRunningJobs && includeFinishedJobs) {
-							gen.writeArrayFieldStart("running");
-							for (JobDetails detail : result.getRunningJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
-							}
-							gen.writeEndArray();
-
-							gen.writeArrayFieldStart("finished");
-							for (JobDetails detail : result.getFinishedJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
-							}
-							gen.writeEndArray();
-						} else {
-							gen.writeArrayFieldStart("jobs");
-							for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
-							}
-							gen.writeEndArray();
-						}
-
-						gen.writeEndObject();
-						gen.close();
-						return writer.toString();
-					} catch (IOException e) {
-						throw new FlinkFutureException("Could not write current jobs overview json.", e);
-					}
-				},
-				executor);
-		}
-		else {
-			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
-		}
-	}
-
-	/**
-	 * Archivist for the CurrentJobsOverviewHandler.
-	 */
-	public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			StringWriter writer = new StringWriter();
-			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
-				gen.writeStartObject();
-				gen.writeArrayFieldStart("running");
-				gen.writeEndArray();
-				gen.writeArrayFieldStart("finished");
-				writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
-				gen.writeEndArray();
-				gen.writeEndObject();
-			}
-			String json = writer.toString();
-			String path = ALL_JOBS_REST_PATH;
-			return Collections.singleton(new ArchivedJson(path, json));
-		}
-	}
-
-	public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
-		gen.writeStartObject();
-
-		gen.writeStringField("jid", details.getJobId().toString());
-		gen.writeStringField("name", details.getJobName());
-		gen.writeStringField("state", details.getStatus().name());
-
-		gen.writeNumberField("start-time", details.getStartTime());
-		gen.writeNumberField("end-time", details.getEndTime());
-		gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
-		gen.writeNumberField("last-modification", details.getLastUpdateTime());
-
-		gen.writeObjectFieldStart("tasks");
-		gen.writeNumberField("total", details.getNumTasks());
-
-		final int[] perState = details.getNumVerticesPerExecutionState();
-		gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] +
-				perState[ExecutionState.SCHEDULED.ordinal()] +
-				perState[ExecutionState.DEPLOYING.ordinal()]);
-		gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]);
-		gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]);
-		gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]);
-		gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]);
-		gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]);
-		gen.writeEndObject();
-
-		gen.writeEndObject();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
deleted file mode 100644
index 39984b1..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Responder that returns the parameters that define how the asynchronous requests
- * against this web server should behave. It defines for example the refresh interval,
- * and time zone of the server timestamps.
- */
-public class DashboardConfigHandler extends AbstractJsonRequestHandler {
-
-	private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
-
-	private final String configString;
-
-	public DashboardConfigHandler(Executor executor, long refreshInterval) {
-		super(executor);
-		try {
-			this.configString = createConfigJson(refreshInterval);
-		}
-		catch (Exception e) {
-			// should never happen
-			throw new RuntimeException(e.getMessage(), e);
-		}
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{DASHBOARD_CONFIG_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		return CompletableFuture.completedFuture(configString);
-	}
-
-	public static String createConfigJson(long refreshInterval) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		TimeZone timeZone = TimeZone.getDefault();
-		String timeZoneName = timeZone.getDisplayName();
-		long timeZoneOffset = timeZone.getRawOffset();
-
-		gen.writeStartObject();
-		gen.writeNumberField("refresh-interval", refreshInterval);
-		gen.writeNumberField("timezone-offset", timeZoneOffset);
-		gen.writeStringField("timezone-name", timeZoneName);
-		gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
-
-		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
-		if (revision != null) {
-			gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
-		}
-
-		gen.writeEndObject();
-
-		gen.close();
-
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 978432b..c95cc32 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 0b0d32e..760c836 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -35,6 +35,8 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.util.ExceptionUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index d9df1d4..04f663d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 95281a4..4248dd4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b117b3d..4d79492 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 7ada0b4..16a1565 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 61b3f58..9a0bac4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 
 import java.io.File;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
deleted file mode 100644
index 4dede3a..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the aggregated user accumulators of a job.
- */
-public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
-
-	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_ACCUMULATORS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createJobAccumulatorsJson(graph);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job accumulators json.", e);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Archivist for the JobAccumulatorsHandler.
-	 */
-	public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createJobAccumulatorsJson(graph);
-			String path = JOB_ACCUMULATORS_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			return Collections.singletonList(new ArchivedJson(path, json));
-		}
-	}
-
-	public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
-
-		gen.writeStartObject();
-
-		gen.writeArrayFieldStart("job-accumulators");
-		// empty for now
-		gen.writeEndArray();
-
-		gen.writeArrayFieldStart("user-task-accumulators");
-		for (StringifiedAccumulatorResult acc : allAccumulators) {
-			gen.writeStartObject();
-			gen.writeStringField("name", acc.getName());
-			gen.writeStringField("type", acc.getType());
-			gen.writeStringField("value", acc.getValue());
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
-		gen.writeEndObject();
-
-		gen.close();
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
deleted file mode 100644
index 1a7d868..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler for the CANCEL request.
- */
-public class JobCancellationHandler extends AbstractJsonRequestHandler {
-
-	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
-	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
-
-	private final Time timeout;
-
-	public JobCancellationHandler(Executor executor, Time timeout) {
-		super(executor);
-		this.timeout = Preconditions.checkNotNull(timeout);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-					if (jobManagerGateway != null) {
-						jobManagerGateway.cancelJob(jobId, timeout);
-						return "{}";
-					}
-					else {
-						throw new Exception("No connection to the leading JobManager.");
-					}
-				}
-				catch (Exception e) {
-					throw new FlinkFutureException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e);
-				}
-			},
-			executor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
deleted file mode 100644
index 4e41447..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler for {@link CancelJobWithSavepoint} messages.
- */
-public class JobCancellationWithSavepointHandlers {
-
-	private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint";
-	private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
-
-	/** URL for in-progress cancellations. */
-	private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
-
-	/** Encodings for String. */
-	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
-	/** Shared lock between Trigger and In-Progress handlers. */
-	private final Object lock = new Object();
-
-	/** In-Progress requests. */
-	private final Map<JobID, Long> inProgress = new HashMap<>();
-
-	/** Succeeded/failed request. Either String or Throwable. */
-	private final Map<Long, Object> completed = new HashMap<>();
-
-	/** Atomic request counter. */
-	private long requestCounter;
-
-	/** Handler for trigger requests. */
-	private final TriggerHandler triggerHandler;
-
-	/** Handler for in-progress requests. */
-	private final InProgressHandler inProgressHandler;
-
-	/** Default savepoint directory. */
-	private final String defaultSavepointDirectory;
-
-	public JobCancellationWithSavepointHandlers(
-			ExecutionGraphHolder currentGraphs,
-			Executor executor) {
-		this(currentGraphs, executor, null);
-	}
-
-	public JobCancellationWithSavepointHandlers(
-			ExecutionGraphHolder currentGraphs,
-			Executor executor,
-			@Nullable String defaultSavepointDirectory) {
-
-		this.triggerHandler = new TriggerHandler(currentGraphs, executor);
-		this.inProgressHandler = new InProgressHandler();
-		this.defaultSavepointDirectory = defaultSavepointDirectory;
-	}
-
-	public TriggerHandler getTriggerHandler() {
-		return triggerHandler;
-	}
-
-	public InProgressHandler getInProgressHandler() {
-		return inProgressHandler;
-	}
-
-	// ------------------------------------------------------------------------
-	// New requests
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Handler for triggering a {@link CancelJobWithSavepoint} message.
-	 */
-	class TriggerHandler implements RequestHandler {
-
-		/** Current execution graphs. */
-		private final ExecutionGraphHolder currentGraphs;
-
-		/** Execution context for futures. */
-		private final Executor executor;
-
-		public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
-			this.currentGraphs = checkNotNull(currentGraphs);
-			this.executor = checkNotNull(executor);
-		}
-
-		@Override
-		public String[] getPaths() {
-			return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public CompletableFuture<FullHttpResponse> handleRequest(
-				Map<String, String> pathParams,
-				Map<String, String> queryParams,
-				JobManagerGateway jobManagerGateway) {
-
-			if (jobManagerGateway != null) {
-				JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-				final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
-
-				graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-
-				return graphFuture.thenApplyAsync(
-					(Optional<AccessExecutionGraph> optGraph) -> {
-						final AccessExecutionGraph graph = optGraph.orElseThrow(
-							() -> new FlinkFutureException(
-								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
-
-						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-						if (coord == null) {
-							throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
-						}
-
-						String targetDirectory = pathParams.get("targetDirectory");
-						if (targetDirectory == null) {
-							if (defaultSavepointDirectory == null) {
-								throw new IllegalStateException("No savepoint directory configured. " +
-									"You can either specify a directory when triggering this savepoint or " +
-									"configure a cluster-wide default via key '" +
-									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-							} else {
-								targetDirectory = defaultSavepointDirectory;
-							}
-						}
-
-						try {
-							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
-						} catch (IOException e) {
-							throw new FlinkFutureException("Could not cancel job with savepoint.", e);
-						}
-					}, executor);
-			} else {
-				return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
-			// Check whether a request exists
-			final long requestId;
-			final boolean isNewRequest;
-			synchronized (lock) {
-				if (inProgress.containsKey(jobId)) {
-					requestId = inProgress.get(jobId);
-					isNewRequest = false;
-				} else {
-					requestId = ++requestCounter;
-					inProgress.put(jobId, requestId);
-					isNewRequest = true;
-				}
-			}
-
-			if (isNewRequest) {
-				boolean success = false;
-
-				try {
-					// Trigger cancellation
-					CompletableFuture<String> cancelJobFuture = jobManagerGateway
-						.cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
-
-					cancelJobFuture.whenCompleteAsync(
-						(String path, Throwable throwable) -> {
-							try {
-								if (throwable != null) {
-									completed.put(requestId, throwable);
-								} else {
-									completed.put(requestId, path);
-								}
-							} finally {
-								inProgress.remove(jobId);
-							}
-						}, executor);
-
-					success = true;
-				} finally {
-					synchronized (lock) {
-						if (!success) {
-							inProgress.remove(jobId);
-						}
-					}
-				}
-			}
-
-			// In-progress location
-			String location = CANCELLATION_IN_PROGRESS_REST_PATH
-					.replace(":jobid", jobId.toString())
-					.replace(":requestId", Long.toString(requestId));
-
-			// Accepted response
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			gen.writeStringField("status", "accepted");
-			gen.writeNumberField("request-id", requestId);
-			gen.writeStringField("location", location);
-			gen.writeEndObject();
-			gen.close();
-
-			String json = writer.toString();
-			byte[] bytes = json.getBytes(ENCODING);
-
-			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1,
-					HttpResponseStatus.ACCEPTED,
-					Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.LOCATION, location);
-
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			FullHttpResponse accepted = response;
-
-			return accepted;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// In-progress requests
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Handler for in-progress cancel with savepoint operations.
-	 */
-	class InProgressHandler implements RequestHandler {
-
-		/** The number of recent checkpoints whose IDs are remembered. */
-		private static final int NUM_GHOST_REQUEST_IDS = 16;
-
-		/** Remember some recently completed. */
-		private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
-
-		@Override
-		public String[] getPaths() {
-			return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-			JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-			long requestId = Long.parseLong(pathParams.get("requestId"));
-
-			return CompletableFuture.supplyAsync(
-				() -> {
-					try {
-						synchronized (lock) {
-							Object result = completed.remove(requestId);
-
-							if (result != null) {
-								// Add to recent history
-								recentlyCompleted.add(new Tuple2<>(requestId, result));
-								if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
-									recentlyCompleted.remove();
-								}
-
-								if (result.getClass() == String.class) {
-									String savepointPath = (String) result;
-									return createSuccessResponse(requestId, savepointPath);
-								} else {
-									Throwable cause = (Throwable) result;
-									return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
-								}
-							} else {
-								// Check in-progress
-								Long inProgressRequestId = inProgress.get(jobId);
-								if (inProgressRequestId != null) {
-									// Sanity check
-									if (inProgressRequestId == requestId) {
-										return createInProgressResponse(requestId);
-									} else {
-										String msg = "Request ID does not belong to JobID";
-										return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
-									}
-								}
-
-								// Check recent history
-								for (Tuple2<Long, Object> recent : recentlyCompleted) {
-									if (recent.f0 == requestId) {
-										if (recent.f1.getClass() == String.class) {
-											String savepointPath = (String) recent.f1;
-											return createSuccessResponse(requestId, savepointPath);
-										} else {
-											Throwable cause = (Throwable) recent.f1;
-											return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
-										}
-									}
-								}
-
-								return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
-							}
-						}
-					} catch (Exception e) {
-						throw new FlinkFutureException("Could not handle in progress request.", e);
-					}
-				});
-		}
-
-		private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-
-			gen.writeStringField("status", "success");
-			gen.writeNumberField("request-id", requestId);
-			gen.writeStringField("savepoint-path", savepointPath);
-
-			gen.writeEndObject();
-			gen.close();
-
-			String json = writer.toString();
-			byte[] bytes = json.getBytes(ENCODING);
-
-			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1,
-					HttpResponseStatus.CREATED,
-					Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			return response;
-		}
-
-		private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-
-			gen.writeStringField("status", "in-progress");
-			gen.writeNumberField("request-id", requestId);
-
-			gen.writeEndObject();
-			gen.close();
-
-			String json = writer.toString();
-			byte[] bytes = json.getBytes(ENCODING);
-
-			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1,
-					HttpResponseStatus.ACCEPTED,
-					Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			return response;
-		}
-
-		private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-
-			gen.writeStringField("status", "failed");
-			gen.writeNumberField("request-id", requestId);
-			gen.writeStringField("cause", errMsg);
-
-			gen.writeEndObject();
-			gen.close();
-
-			String json = writer.toString();
-			byte[] bytes = json.getBytes(ENCODING);
-
-			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1,
-					code,
-					Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			return response;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
deleted file mode 100644
index 0b15b37..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the execution config of a job.
- */
-public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
-
-	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_CONFIG_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createJobConfigJson(graph);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not write job config json.", e);
-				}
-			},
-			executor);
-
-	}
-
-	/**
-	 * Archivist for the JobConfigHandler.
-	 */
-	public static class JobConfigJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createJobConfigJson(graph);
-			String path = JOB_CONFIG_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			return Collections.singletonList(new ArchivedJson(path, json));
-		}
-	}
-
-	public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		gen.writeStartObject();
-		gen.writeStringField("jid", graph.getJobID().toString());
-		gen.writeStringField("name", graph.getJobName());
-
-		final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
-
-		if (summary != null) {
-			gen.writeObjectFieldStart("execution-config");
-
-			gen.writeStringField("execution-mode", summary.getExecutionMode());
-
-			gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
-			gen.writeNumberField("job-parallelism", summary.getParallelism());
-			gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
-
-			Map<String, String> ucVals = summary.getGlobalJobParameters();
-			if (ucVals != null) {
-				gen.writeObjectFieldStart("user-config");
-
-				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
-					gen.writeStringField(ucVal.getKey(), ucVal.getValue());
-				}
-
-				gen.writeEndObject();
-			}
-
-			gen.writeEndObject();
-		}
-		gen.writeEndObject();
-
-		gen.close();
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
deleted file mode 100644
index 8a50f87..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns details about a job. This includes:
- * <ul>
- *     <li>Dataflow plan</li>
- *     <li>id, name, and current status</li>
- *     <li>start time, end time, duration</li>
- *     <li>number of job vertices in each state (pending, running, finished, failed)</li>
- *     <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
- * </ul>
- */
-public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
-	private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices";
-
-	private final MetricFetcher fetcher;
-
-	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
-		super(executionGraphHolder, executor);
-		this.fetcher = fetcher;
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createJobDetailsJson(graph, fetcher);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job details json.", e);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Archivist for the JobDetailsHandler.
-	 */
-	public static class JobDetailsJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createJobDetailsJson(graph, null);
-			String path1 = JOB_DETAILS_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			String path2 = JOB_DETAILS_VERTICES_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			Collection<ArchivedJson> archives = new ArrayList<>();
-			archives.add(new ArchivedJson(path1, json));
-			archives.add(new ArchivedJson(path2, json));
-			return archives;
-		}
-	}
-
-	public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
-		final StringWriter writer = new StringWriter();
-		final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		final long now = System.currentTimeMillis();
-
-		gen.writeStartObject();
-
-		// basic info
-		gen.writeStringField("jid", graph.getJobID().toString());
-		gen.writeStringField("name", graph.getJobName());
-		gen.writeBooleanField("isStoppable", graph.isStoppable());
-		gen.writeStringField("state", graph.getState().name());
-
-		// times and duration
-		final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
-		final long jobEndTime = graph.getState().isGloballyTerminalState() ?
-				graph.getStatusTimestamp(graph.getState()) : -1L;
-		gen.writeNumberField("start-time", jobStartTime);
-		gen.writeNumberField("end-time", jobEndTime);
-		gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
-		gen.writeNumberField("now", now);
-
-		// timestamps
-		gen.writeObjectFieldStart("timestamps");
-		for (JobStatus status : JobStatus.values()) {
-			gen.writeNumberField(status.name(), graph.getStatusTimestamp(status));
-		}
-		gen.writeEndObject();
-
-		// job vertices
-		int[] jobVerticesPerState = new int[ExecutionState.values().length];
-		gen.writeArrayFieldStart("vertices");
-
-		for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
-			int[] tasksPerState = new int[ExecutionState.values().length];
-			long startTime = Long.MAX_VALUE;
-			long endTime = 0;
-			boolean allFinished = true;
-
-			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
-				final ExecutionState state = vertex.getExecutionState();
-				tasksPerState[state.ordinal()]++;
-
-				// take the earliest start time
-				long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
-				if (started > 0) {
-					startTime = Math.min(startTime, started);
-				}
-
-				allFinished &= state.isTerminal();
-				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
-			}
-
-			long duration;
-			if (startTime < Long.MAX_VALUE) {
-				if (allFinished) {
-					duration = endTime - startTime;
-				}
-				else {
-					endTime = -1L;
-					duration = now - startTime;
-				}
-			}
-			else {
-				startTime = -1L;
-				endTime = -1L;
-				duration = -1L;
-			}
-
-			ExecutionState jobVertexState =
-					ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
-			jobVerticesPerState[jobVertexState.ordinal()]++;
-
-			gen.writeStartObject();
-			gen.writeStringField("id", ejv.getJobVertexId().toString());
-			gen.writeStringField("name", ejv.getName());
-			gen.writeNumberField("parallelism", ejv.getParallelism());
-			gen.writeStringField("status", jobVertexState.name());
-
-			gen.writeNumberField("start-time", startTime);
-			gen.writeNumberField("end-time", endTime);
-			gen.writeNumberField("duration", duration);
-
-			gen.writeObjectFieldStart("tasks");
-			for (ExecutionState state : ExecutionState.values()) {
-				gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
-			}
-			gen.writeEndObject();
-
-			MutableIOMetrics counts = new MutableIOMetrics();
-
-			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
-				counts.addIOMetrics(
-					vertex.getCurrentExecutionAttempt(),
-					fetcher,
-					graph.getJobID().toString(),
-					ejv.getJobVertexId().toString());
-			}
-
-			counts.writeIOMetricsAsJson(gen);
-
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
-
-		gen.writeObjectFieldStart("status-counts");
-		for (ExecutionState state : ExecutionState.values()) {
-			gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
-		}
-		gen.writeEndObject();
-
-		gen.writeFieldName("plan");
-		gen.writeRawValue(graph.getJsonPlan());
-
-		gen.writeEndObject();
-
-		gen.close();
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
deleted file mode 100644
index 6ffd443..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.ExceptionUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the configuration of a job.
- */
-public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions";
-
-	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
-
-	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_EXCEPTIONS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createJobExceptionsJson(graph);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job exceptions json.", e);
-				}
-			},
-			executor
-		);
-	}
-
-	/**
-	 * Archivist for the JobExceptionsHandler.
-	 */
-	public static class JobExceptionsJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createJobExceptionsJson(graph);
-			String path = JOB_EXCEPTIONS_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			return Collections.singletonList(new ArchivedJson(path, json));
-		}
-	}
-
-	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		gen.writeStartObject();
-
-		// most important is the root failure cause
-		ErrorInfo rootException = graph.getFailureCause();
-		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
-			gen.writeStringField("root-exception", rootException.getExceptionAsString());
-			gen.writeNumberField("timestamp", rootException.getTimestamp());
-		}
-
-		// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
-		gen.writeArrayFieldStart("all-exceptions");
-
-		int numExceptionsSoFar = 0;
-		boolean truncated = false;
-
-		for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
-			String t = task.getFailureCauseAsString();
-			if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
-				if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
-					truncated = true;
-					break;
-				}
-
-				TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
-				String locationString = location != null ?
-						location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
-
-				gen.writeStartObject();
-				gen.writeStringField("exception", t);
-				gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
-				gen.writeStringField("location", locationString);
-				long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
-				gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp);
-				gen.writeEndObject();
-				numExceptionsSoFar++;
-			}
-		}
-		gen.writeEndArray();
-
-		gen.writeBooleanField("truncated", truncated);
-		gen.writeEndObject();
-
-		gen.close();
-		return writer.toString();
-	}
-}