You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:01:22 UTC

[1/2] flink git commit: [FLINK-8029] Create WebMonitorEndpoint

Repository: flink
Updated Branches:
  refs/heads/master 5b65ca808 -> 0d551640e


[FLINK-8029] Create WebMonitorEndpoint

The WebMonitorEndpoint is the common rest endpoint used for serving
the web frontend REST calls. It will be used by the Dispatcher and
the JobMaster to fuel the web frontend.

This closes #4987.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e3027d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e3027d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e3027d4

Branch: refs/heads/master
Commit: 0e3027d4b87cb5aff5c640ec809f2968b64be732
Parents: 5b65ca8
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 18:12:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:34:32 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 348 +---------------
 .../flink/runtime/jobmaster/JobMaster.java      |  75 ++--
 .../jobmaster/JobMasterRestEndpoint.java        |  46 +++
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 402 +++++++++++++++++++
 4 files changed, 494 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e3027d4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4a99b9d..2ab97e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -22,97 +22,28 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
-import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
-import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
-import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
-import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
-import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
-import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
-import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
-import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
-import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
-import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
-import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
-import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
-import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
-import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
-import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
-import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
-import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
-import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
-import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
-import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
-import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
-import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
-import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
-import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
-import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
-import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
-import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
-import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
  */
-public class DispatcherRestEndpoint extends RestServerEndpoint {
-
-	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
-	private final Configuration clusterConfiguration;
-	private final RestHandlerConfiguration restConfiguration;
-	private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
-	private final Executor executor;
-
-	private final ExecutionGraphCache executionGraphCache;
-	private final CheckpointStatsCache checkpointStatsCache;
-
-	private final MetricFetcher<DispatcherGateway> metricFetcher;
+public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> {
 
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
@@ -122,71 +53,25 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			Executor executor,
 			MetricQueryServiceRetriever metricQueryServiceRetriever) {
-		super(endpointConfiguration);
-		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
-		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
-		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
-		this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);
-		this.executor = Preconditions.checkNotNull(executor);
-
-		this.executionGraphCache = new ExecutionGraphCache(
-			restConfiguration.getTimeout(),
-			Time.milliseconds(restConfiguration.getRefreshInterval()));
-
-		this.checkpointStatsCache = new CheckpointStatsCache(
-			restConfiguration.getMaxCheckpointStatisticCacheEntries());
-
-		this.metricFetcher = new MetricFetcher<>(
+		super(
+			endpointConfiguration,
 			leaderRetriever,
-			metricQueryServiceRetriever,
+			clusterConfiguration,
+			restConfiguration,
+			resourceManagerRetriever,
 			executor,
-			restConfiguration.getTimeout());
+			metricQueryServiceRetriever);
 	}
 
 	@Override
 	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+		List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);
+
+		// Add the Dispatcher specific handlers
 
 		final Time timeout = restConfiguration.getTimeout();
 		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
 
-		ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			ClusterOverviewHeaders.getInstance());
-
-		DashboardConfigHandler dashboardConfigHandler = new DashboardConfigHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			DashboardConfigurationHeaders.getInstance(),
-			restConfiguration.getRefreshInterval());
-
-		JobIdsHandler jobIdsHandler = new JobIdsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobIdsWithStatusesOverviewHeaders.getInstance());
-
-		JobsOverviewHandler jobsOverviewHandler = new JobsOverviewHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobsOverviewHeaders.getInstance());
-
-		ClusterConfigHandler clusterConfigurationHandler = new ClusterConfigHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			ClusterConfigurationInfoHeaders.getInstance(),
-			clusterConfiguration);
-
 		JobTerminationHandler jobTerminationHandler = new JobTerminationHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -194,80 +79,6 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			JobTerminationHeaders.getInstance());
 
-		JobConfigHandler jobConfigHandler = new JobConfigHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobConfigHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		CheckpointConfigHandler checkpointConfigHandler = new CheckpointConfigHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			CheckpointConfigHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			CheckpointingStatisticsHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			CheckpointStatisticDetailsHeaders.getInstance(),
-			executionGraphCache,
-			executor,
-			checkpointStatsCache);
-
-		JobPlanHandler jobPlanHandler = new JobPlanHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobPlanHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		TaskCheckpointStatisticDetailsHandler taskCheckpointStatisticDetailsHandler = new TaskCheckpointStatisticDetailsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			TaskCheckpointStatisticsHeaders.getInstance(),
-			executionGraphCache,
-			executor,
-			checkpointStatsCache);
-
-		JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobExceptionsHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobVertexAccumulatorsHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -280,147 +91,10 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			timeout,
 			responseHeaders);
 
-		TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			TaskManagersHeaders.getInstance(),
-			resourceManagerRetriever);
-
-		TaskManagerDetailsHandler taskManagerDetailsHandler = new TaskManagerDetailsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			TaskManagerDetailsHeaders.getInstance(),
-			resourceManagerRetriever,
-			metricFetcher);
-
-		final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobDetailsHeaders.getInstance(),
-			executionGraphCache,
-			executor,
-			metricFetcher);
-
-		JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			JobAccumulatorsHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		SubtasksTimesHandler subtasksTimesHandler = new SubtasksTimesHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			SubtasksTimesHeaders.getInstance(),
-			executionGraphCache,
-			executor);
-
-		final JobVertexMetricsHandler jobVertexMetricsHandler = new JobVertexMetricsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			metricFetcher);
-
-		final JobMetricsHandler jobMetricsHandler = new JobMetricsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			metricFetcher);
-
-		final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			metricFetcher);
-
-		final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			metricFetcher);
-
-		final JobManagerMetricsHandler jobManagerMetricsHandler = new JobManagerMetricsHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders,
-			metricFetcher);
-
-		final File tmpDir = restConfiguration.getTmpDir();
-
-		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
-
-		try {
-			optWebContent = WebMonitorUtils.tryLoadWebContent(
-				leaderRetriever,
-				restAddressFuture,
-				timeout,
-				tmpDir);
-		} catch (IOException e) {
-			log.warn("Could not load web content handler.", e);
-			optWebContent = Optional.empty();
-		}
-
-		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
-		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
-		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
-		handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
-		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
-		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
-		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
-		handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
-		handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
-		handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
-		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
-		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
-		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
-		handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
-		handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
-		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
-		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
-		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
-		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
-		handlers.add(Tuple2.of(JobMetricsHeaders.getInstance(), jobMetricsHandler));
-		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
-		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
-		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
-
-		optWebContent.ifPresent(
-			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
 
 		return handlers;
 	}
-
-	@Override
-	public void shutdown(Time timeout) {
-		super.shutdown(timeout);
-
-		executionGraphCache.close();
-
-		final File tmpDir = restConfiguration.getTmpDir();
-
-		try {
-			log.info("Removing cache directory {}", tmpDir);
-			FileUtils.deleteDirectory(tmpDir);
-		} catch (Throwable t) {
-			log.warn("Error while deleting cache directory {}", tmpDir, t);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e3027d4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6d0de74..1d4eb6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -127,8 +127,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * JobMaster implementation. The job master is responsible for the execution of a single
  * {@link JobGraph}.
- * <p>
- * It offers the following methods as part of its rpc interface to interact with the JobMaster
+ *
+ * <p>It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:
  * <ul>
  * <li>{@link #updateTaskExecutionState} updates the task execution state for
@@ -137,7 +137,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
 
-	/** Default names for Flink's distributed components */
+	/** Default names for Flink's distributed components. */
 	public static final String JOB_MANAGER_NAME = "jobmanager";
 	public static final String ARCHIVE_NAME = "archive";
 
@@ -147,36 +147,36 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final ResourceID resourceId;
 
-	/** Logical representation of the job */
+	/** Logical representation of the job. */
 	private final JobGraph jobGraph;
 
-	/** Configuration of the JobManager */
+	/** Configuration of the JobManager. */
 	private final Configuration configuration;
 
 	private final Time rpcTimeout;
 
-	/** Service to contend for and retrieve the leadership of JM and RM */
+	/** Service to contend for and retrieve the leadership of JM and RM. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	/** Blob server used across jobs */
+	/** Blob server used across jobs. */
 	private final BlobServer blobServer;
 
-	/** Blob library cache manager used across jobs */
+	/** Blob library cache manager used across jobs. */
 	private final BlobLibraryCacheManager libraryCacheManager;
 
-	/** The metrics for the JobManager itself */
+	/** The metrics for the JobManager itself. */
 	private final MetricGroup jobManagerMetricGroup;
 
-	/** The metrics for the job */
+	/** The metrics for the job. */
 	private final MetricGroup jobMetricGroup;
 
-	/** The heartbeat manager with task managers */
+	/** The heartbeat manager with task managers. */
 	private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
 
-	/** The heartbeat manager with resource manager */
+	/** The heartbeat manager with resource manager. */
 	private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
 
-	/** The execution context which is used to execute futures */
+	/** The execution context which is used to execute futures. */
 	private final Executor executor;
 
 	private final OnCompletionActions jobCompletionActions;
@@ -185,7 +185,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final ClassLoader userCodeLoader;
 
-	/** The execution graph of this job */
+	/** The execution graph of this job. */
 	private final ExecutionGraph executionGraph;
 
 	private final SlotPool slotPool;
@@ -198,10 +198,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// --------- ResourceManager --------
 
-	/** Leader retriever service used to locate ResourceManager's address */
+	/** Leader retriever service used to locate ResourceManager's address. */
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
-	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
+	/** Connection with ResourceManager, null if not located address yet or we close it initiative. */
 	private ResourceManagerConnection resourceManagerConnection;
 
 	// --------- TaskManagers --------
@@ -320,7 +320,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------
 
-
 	@Override
 	public void start() {
 		throw new UnsupportedOperationException("Should never call start() without leader ID");
@@ -405,8 +404,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 */
 	@Override
 	public CompletableFuture<Acknowledge> updateTaskExecutionState(
-			final TaskExecutionState taskExecutionState)
-	{
+			final TaskExecutionState taskExecutionState) {
 		checkNotNull(taskExecutionState, "taskExecutionState");
 
 		if (executionGraph.updateState(taskExecutionState)) {
@@ -534,8 +532,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final TaskStateSnapshot checkpointState) {
 
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		final AcknowledgeCheckpoint ackMessage = 
-				new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
+		final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
+			jobID,
+			executionAttemptID,
+			checkpointId,
+			checkpointMetrics,
+			checkpointState);
 
 		if (checkpointCoordinator != null) {
 			getRpcService().execute(new Runnable() {
@@ -560,8 +562,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final JobID jobID,
 			final ExecutionAttemptID executionAttemptID,
 			final long checkpointID,
-			final Throwable reason)
-	{
+			final Throwable reason) {
 		final DeclineCheckpoint decline = new DeclineCheckpoint(
 				jobID, executionAttemptID, checkpointID, reason);
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
@@ -605,8 +606,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final KeyGroupRange keyGroupRange,
 			final String registrationName,
 			final KvStateID kvStateId,
-			final InetSocketAddress kvStateServerAddress)
-	{
+			final InetSocketAddress kvStateServerAddress) {
 		if (log.isDebugEnabled()) {
 			log.debug("Key value state registered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
@@ -624,8 +624,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	public void notifyKvStateUnregistered(
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
-			String registrationName)
-	{
+			String registrationName) {
 		if (log.isDebugEnabled()) {
 			log.debug("Key value state unregistered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
@@ -786,7 +785,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
-		if (Objects.equals(jobGraph.getJobID(), jobId)) {
+		if (jobGraph.getJobID().equals(jobId)) {
 			return requestArchivedExecutionGraph(timeout);
 		} else {
 			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
@@ -952,7 +951,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
-		
+
 		if (newJobStatus.isGloballyTerminalState()) {
 			switch (newJobStatus) {
 				case FINISHED:
@@ -960,14 +959,14 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 						// TODO get correct job duration
 						// job done, let's get the accumulators
 						Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
-						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults); 
+						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
 
 						executor.execute(() -> jobCompletionActions.jobFinished(result));
 					}
 					catch (Exception e) {
 						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
 
-						final JobExecutionException exception = new JobExecutionException(jobID, 
+						final JobExecutionException exception = new JobExecutionException(jobID,
 								"Failed to retrieve accumulator results. " +
 								"The job is registered as 'FINISHED (successful), but this notification describes " +
 								"a failure, since the resulting accumulators could not be fetched.", e);
@@ -1038,7 +1037,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
 		final ResourceManagerId resourceManagerId = success.getResourceManagerId();
-	
+
 		// verify the response with current connection
 		if (resourceManagerConnection != null
 				&& Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
@@ -1105,8 +1104,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	//----------------------------------------------------------------------------------------------
 
 	private class ResourceManagerConnection
-			extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>
-	{
+			extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
 		private final JobID jobID;
 
 		private final ResourceID jobManagerResourceID;
@@ -1125,8 +1123,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 				final JobMasterId jobMasterId,
 				final String resourceManagerAddress,
 				final ResourceManagerId resourceManagerId,
-				final Executor executor)
-		{
+				final Executor executor) {
 			super(log, resourceManagerAddress, resourceManagerId, executor);
 			this.jobID = checkNotNull(jobID);
 			this.jobManagerResourceID = checkNotNull(jobManagerResourceID);
@@ -1138,12 +1135,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
 			return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(
 					log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
-					getTargetAddress(), getTargetLeaderId())
-			{
+					getTargetAddress(), getTargetLeaderId()) {
 				@Override
 				protected CompletableFuture<RegistrationResponse> invokeRegistration(
-						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception
-				{
+						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
 					Time timeout = Time.milliseconds(timeoutMillis);
 
 					return gateway.registerJobManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/0e3027d4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
new file mode 100644
index 0000000..4baac95
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * REST endpoint for the {@link JobMaster}.
+ */
+public class JobMasterRestEndpoint extends WebMonitorEndpoint<JobMasterGateway> {
+
+	public JobMasterRestEndpoint(
+			RestServerEndpointConfiguration endpointConfiguration,
+			GatewayRetriever<JobMasterGateway> leaderRetriever,
+			Configuration clusterConfiguration,
+			RestHandlerConfiguration restConfiguration,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) {
+		super(endpointConfiguration, leaderRetriever, clusterConfiguration, restConfiguration, resourceManagerRetriever, executor, metricQueryServiceRetriever);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e3027d4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
new file mode 100644
index 0000000..5835c87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Rest endpoint which serves the web frontend REST calls.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint {
+
+	protected final GatewayRetriever<T> leaderRetriever;
+	private final Configuration clusterConfiguration;
+	protected final RestHandlerConfiguration restConfiguration;
+	private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
+	private final Executor executor;
+
+	private final ExecutionGraphCache executionGraphCache;
+	private final CheckpointStatsCache checkpointStatsCache;
+
+	private final MetricFetcher<T> metricFetcher;
+
+	public WebMonitorEndpoint(
+			RestServerEndpointConfiguration endpointConfiguration,
+			GatewayRetriever<T> leaderRetriever,
+			Configuration clusterConfiguration,
+			RestHandlerConfiguration restConfiguration,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) {
+		super(endpointConfiguration);
+		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
+		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
+		this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);
+		this.executor = Preconditions.checkNotNull(executor);
+
+		this.executionGraphCache = new ExecutionGraphCache(
+			restConfiguration.getTimeout(),
+			Time.milliseconds(restConfiguration.getRefreshInterval()));
+
+		this.checkpointStatsCache = new CheckpointStatsCache(
+			restConfiguration.getMaxCheckpointStatisticCacheEntries());
+
+		this.metricFetcher = new MetricFetcher<>(
+			leaderRetriever,
+			metricQueryServiceRetriever,
+			executor,
+			restConfiguration.getTimeout());
+	}
+
+	@Override
+	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+
+		final Time timeout = restConfiguration.getTimeout();
+		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
+
+		ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			ClusterOverviewHeaders.getInstance());
+
+		DashboardConfigHandler dashboardConfigHandler = new DashboardConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			DashboardConfigurationHeaders.getInstance(),
+			restConfiguration.getRefreshInterval());
+
+		JobIdsHandler jobIdsHandler = new JobIdsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobIdsWithStatusesOverviewHeaders.getInstance());
+
+		JobsOverviewHandler jobsOverviewHandler = new JobsOverviewHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobsOverviewHeaders.getInstance());
+
+		ClusterConfigHandler clusterConfigurationHandler = new ClusterConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			ClusterConfigurationInfoHeaders.getInstance(),
+			clusterConfiguration);
+
+		JobConfigHandler jobConfigHandler = new JobConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobConfigHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		CheckpointConfigHandler checkpointConfigHandler = new CheckpointConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			CheckpointConfigHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			CheckpointingStatisticsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			CheckpointStatisticDetailsHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			checkpointStatsCache);
+
+		JobPlanHandler jobPlanHandler = new JobPlanHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobPlanHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		TaskCheckpointStatisticDetailsHandler taskCheckpointStatisticDetailsHandler = new TaskCheckpointStatisticDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			TaskCheckpointStatisticsHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			checkpointStatsCache);
+
+		JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobExceptionsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobVertexAccumulatorsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			TaskManagersHeaders.getInstance(),
+			resourceManagerRetriever);
+
+		TaskManagerDetailsHandler taskManagerDetailsHandler = new TaskManagerDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			TaskManagerDetailsHeaders.getInstance(),
+			resourceManagerRetriever,
+			metricFetcher);
+
+		final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobDetailsHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			metricFetcher);
+
+		JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobAccumulatorsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		SubtasksTimesHandler subtasksTimesHandler = new SubtasksTimesHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			SubtasksTimesHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
+		final JobVertexMetricsHandler jobVertexMetricsHandler = new JobVertexMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
+		final JobMetricsHandler jobMetricsHandler = new JobMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
+		final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
+		final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
+		final JobManagerMetricsHandler jobManagerMetricsHandler = new JobManagerMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
+		final File tmpDir = restConfiguration.getTmpDir();
+
+		Optional<StaticFileServerHandler<T>> optWebContent;
+
+		try {
+			optWebContent = WebMonitorUtils.tryLoadWebContent(
+				leaderRetriever,
+				restAddressFuture,
+				timeout,
+				tmpDir);
+		} catch (IOException e) {
+			log.warn("Could not load web content handler.", e);
+			optWebContent = Optional.empty();
+		}
+
+		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
+		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
+		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
+		handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
+		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
+		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
+		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
+		handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
+		handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
+		handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
+		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
+		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
+		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+		handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
+		handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
+		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
+		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
+		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
+		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+		handlers.add(Tuple2.of(JobMetricsHeaders.getInstance(), jobMetricsHandler));
+		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
+		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
+		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
+
+		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
+		optWebContent.ifPresent(
+			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
+
+		return handlers;
+	}
+
+	@Override
+	public void shutdown(Time timeout) {
+		super.shutdown(timeout);
+
+		executionGraphCache.close();
+
+		final File tmpDir = restConfiguration.getTmpDir();
+
+		try {
+			log.info("Removing cache directory {}", tmpDir);
+			FileUtils.deleteDirectory(tmpDir);
+		} catch (Throwable t) {
+			log.warn("Error while deleting cache directory {}", tmpDir, t);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClusterEntrypoint

Posted by tr...@apache.org.
[FLINK-8030] Instantiate JobMasterRestEndpoint in JobClusterEntrypoint

This closes #4988.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d551640
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d551640
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d551640

Branch: refs/heads/master
Commit: 0d551640e65073afd8755e04f7817a6379149251
Parents: 0e3027d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 17:27:38 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:35:21 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/JobClusterEntrypoint.java        | 107 ++++++++++++++++++-
 .../runtime/jobmaster/JobManagerRunner.java     |   4 +
 .../runtime/webmonitor/WebMonitorEndpoint.java  |   2 +-
 3 files changed, 110 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 1c8fb21..bd1f573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -29,16 +31,35 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -48,6 +69,12 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 	private JobManagerServices jobManagerServices;
 
+	private JobMasterRestEndpoint jobMasterRestEndpoint;
+
+	private LeaderRetrievalService jobMasterRetrievalService;
+
+	private LeaderRetrievalService resourceManagerRetrievalService;
+
 	private JobManagerRunner jobManagerRunner;
 
 	public JobClusterEntrypoint(Configuration configuration) {
@@ -74,6 +101,36 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 		jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);
 
+		resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+		final LeaderGatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			JobMasterGateway.class,
+			JobMasterId::new,
+			10,
+			Time.milliseconds(50L));
+
+		final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			ResourceManagerGateway.class,
+			ResourceManagerId::new,
+			10,
+			Time.milliseconds(50L));
+
+		// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+		jobMasterRestEndpoint = createJobMasterRestEndpoint(
+			configuration,
+			jobMasterGatewayRetriever,
+			resourceManagerGatewayRetriever,
+			rpcService.getExecutor(),
+			new AkkaQueryServiceRetriever(actorSystem, timeout));
+
+		LOG.debug("Starting JobMaster REST endpoint.");
+		jobMasterRestEndpoint.start();
+
 		jobManagerRunner = createJobManagerRunner(
 			configuration,
 			ResourceID.generate(),
@@ -83,13 +140,39 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			heartbeatServices,
 			metricRegistry,
 			this,
-			null);
+			jobMasterRestEndpoint.getRestAddress());
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
+		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
 
 		LOG.debug("Starting JobManager.");
 		jobManagerRunner.start();
+
+		jobMasterRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+			jobManagerRunner.getJobGraph().getJobID(),
+			jobManagerRunner.getAddress());
+		jobMasterRetrievalService.start(jobMasterGatewayRetriever);
+	}
+
+	protected JobMasterRestEndpoint createJobMasterRestEndpoint(
+			Configuration configuration,
+			GatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) throws ConfigurationException {
+
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new JobMasterRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			jobMasterGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			executor,
+			metricQueryServiceRetriever);
+
 	}
 
 	protected JobManagerRunner createJobManagerRunner(
@@ -103,7 +186,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress) throws Exception {
 
-		JobGraph jobGraph = retrieveJobGraph(configuration);
+		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
 		return new JobManagerRunner(
 			resourceId,
@@ -123,6 +206,18 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 		Throwable exception = null;
 
+		if (jobMasterRestEndpoint != null) {
+			jobMasterRestEndpoint.shutdown(Time.seconds(10L));
+		}
+
+		if (jobMasterRetrievalService != null) {
+			try {
+				jobMasterRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (jobManagerRunner != null) {
 			try {
 				jobManagerRunner.shutdown();
@@ -139,6 +234,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			}
 		}
 
+		if (resourceManagerRetrievalService != null) {
+			try {
+				resourceManagerRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (resourceManager != null) {
 			try {
 				resourceManager.shutDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index ed3d43d..e699d6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -192,6 +192,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 		return jobManager.getSelfGateway(JobMasterGateway.class);
 	}
 
+	public JobGraph getJobGraph() {
+		return jobGraph;
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5835c87..703a754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -142,7 +142,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	@Override
 	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
 
 		final Time timeout = restConfiguration.getTimeout();
 		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();