You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/02 10:40:38 UTC

[4/4] flink git commit: [FLINK-5870] Handlers define REST URLs

[FLINK-5870] Handlers define REST URLs

This closes #3376.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/999bacef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/999bacef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/999bacef

Branch: refs/heads/master
Commit: 999baceff36165d950a61dd9cc4342f252e64837
Parents: 51b7ede
Author: zentol <ch...@apache.org>
Authored: Wed Mar 1 12:23:15 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 2 11:39:04 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       |   5 +
 .../webmonitor/RuntimeMonitorHandlerBase.java   |   7 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 193 +++++++++++--------
 .../handlers/ClusterOverviewHandler.java        |   7 +
 .../handlers/CurrentJobIdsHandler.java          |   7 +
 .../handlers/CurrentJobsOverviewHandler.java    |  16 ++
 .../handlers/DashboardConfigHandler.java        |   7 +
 .../handlers/JarAccessDeniedHandler.java        |  11 ++
 .../webmonitor/handlers/JarDeleteHandler.java   |   7 +
 .../webmonitor/handlers/JarListHandler.java     |   7 +
 .../webmonitor/handlers/JarPlanHandler.java     |   7 +
 .../webmonitor/handlers/JarRunHandler.java      |   7 +
 .../webmonitor/handlers/JarUploadHandler.java   |   7 +
 .../handlers/JobAccumulatorsHandler.java        |   7 +
 .../handlers/JobCancellationHandler.java        |   8 +
 .../JobCancellationWithSavepointHandlers.java   |  17 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   7 +
 .../webmonitor/handlers/JobDetailsHandler.java  |   8 +
 .../handlers/JobExceptionsHandler.java          |   7 +
 .../handlers/JobManagerConfigHandler.java       |   7 +
 .../webmonitor/handlers/JobPlanHandler.java     |   6 +
 .../webmonitor/handlers/JobStoppingHandler.java |   8 +
 .../handlers/JobVertexAccumulatorsHandler.java  |   7 +
 .../handlers/JobVertexBackPressureHandler.java  |   7 +
 .../handlers/JobVertexDetailsHandler.java       |   7 +
 .../handlers/JobVertexTaskManagersHandler.java  |   7 +
 .../webmonitor/handlers/RequestHandler.java     |   7 +
 .../SubtaskCurrentAttemptDetailsHandler.java    |   7 +
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +
 .../SubtaskExecutionAttemptDetailsHandler.java  |   7 +
 .../SubtasksAllAccumulatorsHandler.java         |   7 +
 .../handlers/SubtasksTimesHandler.java          |   6 +
 .../handlers/TaskManagerLogHandler.java         |  12 ++
 .../handlers/TaskManagersHandler.java           |   8 +
 .../checkpoints/CheckpointConfigHandler.java    |   7 +
 .../CheckpointStatsDetailsHandler.java          |   7 +
 .../CheckpointStatsDetailsSubtasksHandler.java  |   7 +
 .../checkpoints/CheckpointStatsHandler.java     |   7 +
 .../metrics/JobManagerMetricsHandler.java       |   8 +
 .../webmonitor/metrics/JobMetricsHandler.java   |   6 +
 .../metrics/JobVertexMetricsHandler.java        |   6 +
 .../metrics/TaskManagerMetricsHandler.java      |   8 +
 .../handlers/ClusterOverviewHandlerTest.java    |  34 ++++
 .../handlers/CurrentJobIdsHandlerTest.java      |  34 ++++
 .../CurrentJobsOverviewHandlerTest.java         |  44 +++++
 .../handlers/DashboardConfigHandlerTest.java    |  31 +++
 .../handlers/JarAccessDeniedHandlerTest.java    |  39 ++++
 .../handlers/JarDeleteHandlerTest.java          |  31 +++
 .../webmonitor/handlers/JarListHandlerTest.java |  31 +++
 .../webmonitor/handlers/JarPlanHandlerTest.java |  31 +++
 .../webmonitor/handlers/JarRunHandlerTest.java  |  31 +++
 .../handlers/JarUploadHandlerTest.java          |  31 +++
 .../handlers/JobAccumulatorsHandlerTest.java    |  31 +++
 .../handlers/JobCancellationHandlerTest.java    |  36 ++++
 ...obCancellationWithSavepointHandlersTest.java |  20 ++
 .../handlers/JobConfigHandlerTest.java          |  31 +++
 .../handlers/JobDetailsHandlerTest.java         |  36 ++++
 .../handlers/JobExceptionsHandlerTest.java      |  31 +++
 .../handlers/JobManagerConfigHandlerTest.java   |  31 +++
 .../webmonitor/handlers/JobPlanHandlerTest.java |  31 +++
 .../handlers/JobStoppingHandlerTest.java        |  36 ++++
 .../JobVertexAccumulatorsHandlerTest.java       |  31 +++
 .../JobVertexBackPressureHandlerTest.java       |   8 +
 .../handlers/JobVertexDetailsHandlerTest.java   |  31 +++
 .../JobVertexTaskManagersHandlerTest.java       |  31 +++
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  31 +++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  31 +++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  31 +++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  31 +++
 .../handlers/SubtasksTimesHandlerTest.java      |  31 +++
 .../handlers/TaskManagerLogHandlerTest.java     |  28 +++
 .../handlers/TaskManagersHandlerTest.java       |  38 ++++
 .../CheckpointConfigHandlerTest.java            |   9 +
 .../CheckpointStatsDetailsHandlerTest.java      |   9 +
 .../checkpoints/CheckpointStatsHandlerTest.java |   9 +
 ...heckpointStatsSubtaskDetailsHandlerTest.java |   9 +
 .../metrics/JobManagerMetricsHandlerTest.java   |  10 +
 .../metrics/JobMetricsHandlerTest.java          |  10 +
 .../metrics/JobVertexMetricsHandlerTest.java    |  10 +
 .../metrics/TaskManagerMetricsHandlerTest.java  |  10 +
 80 files changed, 1439 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 8dbd135..8bd58a3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -76,6 +76,11 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return handler.getPaths();
+	}
+
+	@Override
 	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
 		FullHttpResponse response;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index 9442867..3c1dcb6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -67,6 +67,13 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 		this.httpsEnabled = httpsEnabled;
 	}
 
+	/**
+	 * Returns an array of REST URL's under which this handler can be registered.
+	 *
+	 * @return array containing REST URL's under which this handler can be registered.
+	 */
+	public abstract String[] getPaths();
+
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
 		if (localJobManagerAddressFuture.isCompleted()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a9cb630..dddc69d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -256,56 +256,50 @@ public class WebRuntimeMonitor implements WebMonitor {
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
 		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
 
-		router = new Router()
-			// config how to interact with this web server
-			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
-
-			// the overview - how many task managers, slots, free slots, ...
-			.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
-
-			// job manager configuration
-			.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
-
-			// overview over jobs
-			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))
-			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)))
-			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)))
-
-			.GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
-
-			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
-			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
-
-			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs, metricFetcher)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
-			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
-							currentGraphs,
-							backPressureStatsTracker,
-							refreshInterval)))
-			.GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
-
-			.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
-			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
-			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
-
-			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
-				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-					TaskManagerLogHandler.FileMode.LOG, config, enableSSL))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
-				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-					TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher)))
+		router = new Router();
+		// config how to interact with this web server
+		GET(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
+
+		// the overview - how many task managers, slots, free slots, ...
+		GET(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+
+		// job manager configuration
+		GET(router, new JobManagerConfigHandler(config));
+
+		// overview over jobs
+		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
+		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
+		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
+
+		GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
+
+		GET(router, new JobDetailsHandler(currentGraphs, metricFetcher));
+
+		GET(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
+		GET(router, new SubtasksTimesHandler(currentGraphs));
+		GET(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
+		GET(router, new JobVertexAccumulatorsHandler(currentGraphs));
+		GET(router, new JobVertexBackPressureHandler(currentGraphs,	backPressureStatsTracker, refreshInterval));
+		GET(router, new JobVertexMetricsHandler(metricFetcher));
+		GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
+		GET(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
+		GET(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
+		GET(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
+
+		GET(router, new JobPlanHandler(currentGraphs));
+		GET(router, new JobConfigHandler(currentGraphs));
+		GET(router, new JobExceptionsHandler(currentGraphs));
+		GET(router, new JobAccumulatorsHandler(currentGraphs));
+		GET(router, new JobMetricsHandler(metricFetcher));
+
+		GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
+		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
+				TaskManagerLogHandler.FileMode.LOG, config, enableSSL));
+		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
+				TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL));
+		GET(router, new TaskManagerMetricsHandler(metricFetcher));
 
+		router
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
 				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile,
@@ -313,25 +307,22 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
 				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
-					enableSSL))
-
-			.GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher)))
+					enableSSL));
 
-			// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
+		GET(router, new JobManagerMetricsHandler(metricFetcher));
 
-			// DELETE is the preferred way of canceling a job (Rest-conform)
-			.DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler()))
+		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
+		GET(router, new JobCancellationHandler());
+		// DELETE is the preferred way of canceling a job (Rest-conform)
+		DELETE(router, new JobCancellationHandler());
 
-			.GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler)
-			.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler)
-			.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
+		GET(router, triggerHandler);
+		GET(router, inProgressHandler);
 
-			// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-			.GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler()))
-
-			// DELETE is the preferred way of stopping a job (Rest-conform)
-			.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
+		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
+		GET(router, new JobStoppingHandler());
+		// DELETE is the preferred way of stopping a job (Rest-conform)
+		DELETE(router, new JobStoppingHandler());
 
 		int maxCachedEntries = config.getInteger(
 				ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
@@ -339,34 +330,32 @@ public class WebRuntimeMonitor implements WebMonitor {
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
 
 		// Register the checkpoint stats handlers
-		router
-			.GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs)))
-			.GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs, cache)))
-			.GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
+		GET(router, new CheckpointStatsHandler(currentGraphs));
+		GET(router, new CheckpointConfigHandler(currentGraphs));
+		GET(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
+		GET(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
 
 		if (webSubmitAllow) {
-			router
-				// fetch the list of uploaded jars.
-				.GET("/jars", handler(new JarListHandler(uploadDir)))
+			// fetch the list of uploaded jars.
+			GET(router, new JarListHandler(uploadDir));
 
-				// get plan for an uploaded jar
-				.GET("/jars/:jarid/plan", handler(new JarPlanHandler(uploadDir)))
+			// get plan for an uploaded jar
+			GET(router, new JarPlanHandler(uploadDir));
 
-				// run a jar
-				.POST("/jars/:jarid/run", handler(new JarRunHandler(uploadDir, timeout, config)))
+			// run a jar
+			POST(router, new JarRunHandler(uploadDir, timeout, config));
 
-				// upload a jar
-				.POST("/jars/upload", handler(new JarUploadHandler(uploadDir)))
+			// upload a jar
+			POST(router, new JarUploadHandler(uploadDir));
 
-				// delete an uploaded jar from submission interface
-				.DELETE("/jars/:jarid", handler(new JarDeleteHandler(uploadDir)));
+			// delete an uploaded jar from submission interface
+			DELETE(router, new JarDeleteHandler(uploadDir));
 		} else {
-			router
-				// send an Access Denied message (sort of)
-				// Every other GET request will go to the File Server, which will not provide
-				// access to the jar directory anyway, because it doesn't exist in webRootDir.
-				.GET("/jars", handler(new JarAccessDeniedHandler()));
+			// send an Access Denied message
+			JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
+			GET(router, jad);
+			POST(router, jad);
+			DELETE(router, jad);
 		}
 
 		// this handler serves all the static contents
@@ -526,6 +515,40 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 	}
 
+	/** These methods are used in the route path setup. They register the given {@link RequestHandler} or
+	 * {@link RuntimeMonitorHandlerBase} with the given {@link Router} for the respective REST method.
+	 * The REST paths under which they are registered are defined by the handlers. **/
+
+	private void GET(Router router, RequestHandler handler) {
+		GET(router, handler(handler));
+	}
+
+	private void GET(Router router, RuntimeMonitorHandlerBase handler) {
+		for (String path : handler.getPaths()) {
+			router.GET(path, handler);
+		}
+	}
+
+	private void DELETE(Router router, RequestHandler handler) {
+		DELETE(router, handler(handler));
+	}
+
+	private void DELETE(Router router, RuntimeMonitorHandlerBase handler) {
+		for (String path : handler.getPaths()) {
+			router.DELETE(path, handler);
+		}
+	}
+
+	private void POST(Router router, RequestHandler handler) {
+		POST(router, handler(handler));
+	}
+
+	private void POST(Router router, RuntimeMonitorHandlerBase handler) {
+		for (String path : handler.getPaths()) {
+			router.POST(path, handler);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 99ef3d9..2bd055d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
+	private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
 	private static final String version = EnvironmentInformation.getVersion();
 
 	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
@@ -49,6 +51,11 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index b690c56..94b1c16 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -39,11 +39,18 @@ import static java.util.Objects.requireNonNull;
  */
 public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
+	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
 	private final FiniteDuration timeout;
 	
 	public CurrentJobIdsHandler(FiniteDuration timeout) {
 		this.timeout = requireNonNull(timeout);
 	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CURRENT_JOB_IDS_REST_PATH};
+	}
 	
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 07064da..8486a9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -38,6 +38,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 
+	private static final String ALL_JOBS_REST_PATH = "/joboverview";
+	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
+	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
+
 	private final FiniteDuration timeout;
 	
 	private final boolean includeRunningJobs;
@@ -55,6 +59,18 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		if (includeRunningJobs && includeFinishedJobs) {
+			return new String[]{ALL_JOBS_REST_PATH};
+		}
+		if (includeRunningJobs) {
+			return new String[]{RUNNING_JOBS_REST_PATH};
+		} else {
+			return new String[]{COMPLETED_JOBS_REST_PATH};
+		}
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 6fe072b..49f4c26 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -32,6 +32,8 @@ import java.util.TimeZone;
  * and time zone of the server timestamps.
  */
 public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+
+	private static String DASHBOARD_CONFIG_REST_PATH = "/config";
 	
 	private final String configString;
 	
@@ -65,6 +67,11 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 			throw new RuntimeException(e.getMessage(), e);
 		}
 	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{DASHBOARD_CONFIG_REST_PATH};
+	}
 	
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index ba32d0d..f0e3faf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -28,6 +28,17 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 			"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{
+			JarListHandler.JAR_LIST_REST_PATH,
+			JarPlanHandler.JAR_PLAN_REST_PATH,
+			JarRunHandler.JAR_RUN_REST_PATH,
+			JarUploadHandler.JAR_UPLOAD_REST_PATH,
+			JarDeleteHandler.JAR_DELETE_REST_PATH
+		};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return ERROR_MESSAGE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index ae959a5..f3bf231 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -31,6 +31,8 @@ import java.util.Map;
  */
 public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
+	static final String JAR_DELETE_REST_PATH = "/jars/:jarid";
+
 	private final File jarDir;
 
 	public JarDeleteHandler(File jarDirectory) {
@@ -38,6 +40,11 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JAR_DELETE_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		final String file = pathParams.get("jarid");
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index f3cdc30..81b5e34 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -33,6 +33,8 @@ import java.util.jar.Manifest;
 
 public class JarListHandler extends AbstractJsonRequestHandler {
 
+	static final String JAR_LIST_REST_PATH = "/jars";
+
 	private final File jarDir;
 
 	public  JarListHandler(File jarDirectory) {
@@ -40,6 +42,11 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JAR_LIST_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index bd0a6af..d121119 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -32,11 +32,18 @@ import java.util.Map;
  */
 public class JarPlanHandler extends JarActionHandler {
 
+	static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan";
+
 	public JarPlanHandler(File jarDirectory) {
 		super(jarDirectory);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JAR_PLAN_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 474be33..5f39e19 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -38,6 +38,8 @@ import java.util.Map;
  */
 public class JarRunHandler extends JarActionHandler {
 
+	static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
+
 	private final FiniteDuration timeout;
 	private final Configuration clientConfig;
 
@@ -48,6 +50,11 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JAR_RUN_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 9a3b0e1..3d7cb8a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -29,6 +29,8 @@ import java.util.UUID;
  */
 public class JarUploadHandler extends AbstractJsonRequestHandler {
 
+	static final String JAR_UPLOAD_REST_PATH = "/jars/upload";
+
 	private final File jarDir;
 
 	public JarUploadHandler(File jarDir) {
@@ -36,6 +38,11 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JAR_UPLOAD_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 29613a0..7664153 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -30,12 +30,19 @@ import java.util.Map;
  * Request handler that returns the aggregated user accumulators of a job.
  */
 public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
 	
 	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_ACCUMULATORS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 9f35719..d9de7d7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -30,6 +30,14 @@ import java.util.Map;
  */
 public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
+	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
+	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
+	}
+
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 492ce76..b618d85 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -55,8 +55,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobCancellationWithSavepointHandlers {
 
+	private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint";
+	private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
+
 	/** URL for in-progress cancellations. */
-	public static final String IN_PROGRESS_URL = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+	private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
 
 	/** Encodings for String. */
 	private static final Charset ENCODING = Charset.forName("UTF-8");
@@ -127,6 +130,11 @@ public class JobCancellationWithSavepointHandlers {
 		}
 
 		@Override
+		public String[] getPaths() {
+			return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
+		}
+
+		@Override
 		@SuppressWarnings("unchecked")
 		public FullHttpResponse handleRequest(
 				Map<String, String> pathParams,
@@ -230,7 +238,7 @@ public class JobCancellationWithSavepointHandlers {
 			}
 
 			// In-progress location
-			String location = IN_PROGRESS_URL
+			String location = CANCELLATION_IN_PROGRESS_REST_PATH
 					.replace(":jobid", jobId.toString())
 					.replace(":requestId", Long.toString(requestId));
 
@@ -279,6 +287,11 @@ public class JobCancellationWithSavepointHandlers {
 		private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
 
 		@Override
+		public String[] getPaths() {
+			return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
+		}
+
+		@Override
 		@SuppressWarnings("unchecked")
 		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 21639ef..459ca2a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -31,11 +31,18 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
  */
 public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
+
 	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_CONFIG_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 35e6ca7..7780e66 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -47,6 +47,9 @@ import java.util.Map;
  */
 public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
+	private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices";
+
 	private final MetricFetcher fetcher;
 
 	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
@@ -55,6 +58,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		final StringWriter writer = new StringWriter();
 		final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 376cca4..3720dac 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -33,6 +33,8 @@ import java.util.Map;
  */
 public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions";
+
 	private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 	
 	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
@@ -40,6 +42,11 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_EXCEPTIONS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 11ca931..5fcf010 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -30,6 +30,8 @@ import java.util.Map;
  */
 public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
+	private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config";
+
 	private final Configuration config;
 
 	public JobManagerConfigHandler(Configuration config) {
@@ -37,6 +39,11 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 64f7000..becc2e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -28,12 +28,18 @@ import java.util.Map;
  */
 public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
 	
 	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_PLAN_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		return graph.getJsonPlan();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 0f8c958..c8ec689 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -30,6 +30,14 @@ import java.util.Map;
  */
 public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
+	private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop";
+	private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop";
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH};
+	}
+
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ad4e207..ccfcbba 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -29,12 +29,19 @@ import java.util.Map;
 
 
 public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler {
+
+	private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
 	
 	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index c5bacf2..52167e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -39,6 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler {
 
+	private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = "/jobs/:jobid/vertices/:vertexid/backpressure";
+
 	/** Back pressure stats tracker. */
 	private final BackPressureStatsTracker backPressureStatsTracker;
 
@@ -57,6 +59,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(
 			AccessExecutionJobVertex accessJobVertex,
 			Map<String, String> params) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 32626ba..0a07896 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
  */
 public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
+	private static String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid";
+
 	private final MetricFetcher fetcher;
 
 	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
@@ -47,6 +49,11 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_DETAILS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final long now = System.currentTimeMillis();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index f468d35..b3dabea 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -43,6 +43,8 @@ import java.util.Map.Entry;
  */
 public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler {
 
+	private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/taskmanagers";
+
 	private final MetricFetcher fetcher;
 
 	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
@@ -51,6 +53,11 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index c56cfc3..b6246e6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -49,4 +49,11 @@ public interface RequestHandler {
 	 *         with the exception stack trace.
 	 */
 	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+
+	/**
+	 * Returns an array of REST URL's under which this handler can be registered.
+	 *
+	 * @return array containing REST URL's under which this handler can be registered.
+	 */
+	String[] getPaths();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 6d09513..4cf5f0f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -28,12 +28,19 @@ import java.util.Map;
  * Request handler providing details about a single task execution attempt.
  */
 public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler {
+
+	public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
 	
 	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
 		super(executionGraphHolder, fetcher);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
 		return handleRequest(vertex.getCurrentExecutionAttempt(), params);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index e613efb..ba3a5ee 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -31,12 +31,19 @@ import java.util.Map;
  * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).  
  */
 public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler {
+
+	private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
 	
 	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index da8db02..b753b6e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -37,6 +37,8 @@ import java.util.Map;
  */
 public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler {
 
+	private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt";
+
 	private final MetricFetcher fetcher;
 
 	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
@@ -45,6 +47,11 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
 		final ExecutionState status = execAttempt.getState();
 		final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 892a606..222d474 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -33,12 +33,19 @@ import java.util.Map;
  * Request handler that returns the accumulators for all subtasks of job vertex.
  */
 public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler {
+
+	private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
 	
 	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 76349ee..e2e35e3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -35,12 +35,18 @@ import java.util.Map;
  */
 public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
+	private static final String SUBTASK_TIMES_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasktimes";
 	
 	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{SUBTASK_TIMES_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 6583d3b..1002bf3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -94,6 +94,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class);
 
+	private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log";
+	private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout";
+
 	/** Keep track of last transmitted log, to clean up old ones */
 	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
 	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
@@ -141,6 +144,15 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		timeTimeout = Time.milliseconds(timeout.toMillis());
 	}
 
+	@Override
+	public String[] getPaths() {
+		if (serveLogFile) {
+			return new String[]{TASKMANAGER_LOG_REST_PATH};
+		} else {
+			return new String[]{TASKMANAGER_OUT_REST_PATH};
+		}
+	}
+
 	/**
 	 * Response when running with leading JobManager.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index c757f5c..a23e983 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -41,6 +41,9 @@ import static java.util.Objects.requireNonNull;
 
 public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
+	private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
+	private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid";
+
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 	
 	private final FiniteDuration timeout;
@@ -53,6 +56,11 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index be0d283..de40a4a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -34,11 +34,18 @@ import java.util.Map;
  */
 public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
+
 	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_CONFIG_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index d461f03..e651824 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -38,6 +38,8 @@ import java.util.Map;
  */
 public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid";
+
 	private final CheckpointStatsCache cache;
 
 	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
@@ -46,6 +48,11 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		long checkpointId = parseCheckpointId(params);
 		if (checkpointId == -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d55467f..15dd911 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -44,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
 	private final CheckpointStatsCache cache;
 
 	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
@@ -52,6 +54,11 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
+	}
+
+	@Override
 	public String handleJsonRequest(
 		Map<String, String> pathParams,
 		Map<String, String> queryParams,

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 404b2c7..6413806 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -43,11 +43,18 @@ import java.util.Map;
  */
 public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
 
+	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
+
 	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_REST_PATH};
+	}
+
+	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7452c71..f667ce5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -31,11 +31,19 @@ import java.util.Map;
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
 public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+
+	private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
+
 	public JobManagerMetricsHandler(MetricFetcher fetcher) {
 		super(fetcher);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOBMANAGER_METRICS_REST_PATH};
+	}
+
+	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
 		if (jobManager == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index d66c954..26c9fa9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -32,12 +32,18 @@ import java.util.Map;
  */
 public class JobMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_JOB_ID = "jobid";
+	private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
 
 	public JobMetricsHandler(MetricFetcher fetcher) {
 		super(fetcher);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_METRICS_REST_PATH};
+	}
+
+	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
 		return job != null

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 6fca771..3e838d7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -32,12 +32,18 @@ import java.util.Map;
  */
 public class JobVertexMetricsHandler extends AbstractMetricsHandler {
 	public static final String PARAMETER_VERTEX_ID = "vertexid";
+	private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
 
 	public JobVertexMetricsHandler(MetricFetcher fetcher) {
 		super(fetcher);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_METRICS_REST_PATH};
+	}
+
+	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
 			pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index f1b2e72..a74f5f2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -33,11 +33,19 @@ import java.util.Map;
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
 public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+
+	private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
+
 	public TaskManagerMetricsHandler(MetricFetcher fetcher) {
 		super(fetcher);
 	}
 
 	@Override
+	public String[] getPaths() {
+		return new String[]{TASKMANAGER_METRICS_REST_PATH};
+	}
+
+	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
 		if (taskManager == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
new file mode 100644
index 0000000..018ffdd
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class ClusterOverviewHandlerTest {
+	@Test
+	public void testGetPaths() {
+		ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/overview", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
new file mode 100644
index 0000000..e225648
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class CurrentJobIdsHandlerTest {
+	@Test
+	public void testGetPaths() {
+		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
new file mode 100644
index 0000000..3207fec
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class CurrentJobsOverviewHandlerTest {
+	@Test
+	public void testGetPaths() {
+		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
+		String[] pathsAll = handlerAll.getPaths();
+		Assert.assertEquals(1, pathsAll.length);
+		Assert.assertEquals("/joboverview", pathsAll[0]);
+
+		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false);
+		String[] pathsRunning = handlerRunning.getPaths();
+		Assert.assertEquals(1, pathsRunning.length);
+		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
+
+		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true);
+		String[] pathsCompleted = handlerCompleted.getPaths();
+		Assert.assertEquals(1, pathsCompleted.length);
+		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
new file mode 100644
index 0000000..aa2d552
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DashboardConfigHandlerTest {
+	@Test
+	public void testGetPaths() {
+		DashboardConfigHandler handler = new DashboardConfigHandler(10000L);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/config", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
new file mode 100644
index 0000000..e84926e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class JarAccessDeniedHandlerTest {
+	@Test
+	public void testGetPaths() {
+		JarAccessDeniedHandler handler = new JarAccessDeniedHandler();
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(5, paths.length);
+		List<String> pathsList = Lists.newArrayList(paths);
+		Assert.assertTrue(pathsList.contains("/jars"));
+		Assert.assertTrue(pathsList.contains("/jars/upload"));
+		Assert.assertTrue(pathsList.contains("/jars/:jarid"));
+		Assert.assertTrue(pathsList.contains("/jars/:jarid/plan"));
+		Assert.assertTrue(pathsList.contains("/jars/:jarid/run"));
+	}
+}