You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/02 16:04:56 UTC

[1/2] flink git commit: [FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution

Repository: flink
Updated Branches:
  refs/heads/master ee9027e49 -> 49f690986


[FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution

Make sure that the value maps are of type LinkedHashMap in SlotSharingGroupAssignment#availableSlotsPerJid

This closes #4839.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c669d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c669d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c669d4

Branch: refs/heads/master
Commit: d9c669d4781f095806013651c1a579eae0ca2650
Parents: ee9027e
Author: Till <ti...@gmail.com>
Authored: Mon Oct 16 16:18:23 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 16:39:16 2017 +0100

----------------------------------------------------------------------
 .../instance/SlotSharingGroupAssignment.java    | 46 ++++++-----
 .../SlotSharingGroupAssignmentTest.java         | 82 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 7618b18..4371290 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -96,7 +96,7 @@ public class SlotSharingGroupAssignment {
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
 
 	/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */
-	private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
+	private final Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
 
 
 	// --------------------------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public class SlotSharingGroupAssignment {
 				// can place a task into this slot.
 				boolean entryForNewJidExists = false;
 				
-				for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+				for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
 					// there is already an entry for this groupID
 					if (entry.getKey().equals(groupIdForMap)) {
 						entryForNewJidExists = true;
@@ -246,7 +246,7 @@ public class SlotSharingGroupAssignment {
 
 				// make sure an empty entry exists for this group, if no other entry exists
 				if (!entryForNewJidExists) {
-					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
+					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<>());
 				}
 
 				return subSlot;
@@ -391,7 +391,7 @@ public class SlotSharingGroupAssignment {
 		}
 
 		// get the available slots for the group
-		Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
+		LinkedHashMap<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
 		
 		if (slotsForGroup == null) {
 			// we have a new group, so all slots are available
@@ -621,20 +621,26 @@ public class SlotSharingGroupAssignment {
 	
 	private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) {
 		Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator();
-		
+
 		while (iter.hasNext()) {
-			List<SharedSlot> slots = iter.next().getValue();
-			
-			if (slots.isEmpty()) {
-				iter.remove();
-			}
-			else if (slots.size() == 1) {
-				SharedSlot slot = slots.remove(0);
-				iter.remove();
-				return slot;
-			}
-			else {
-				return slots.remove(slots.size() - 1);
+			Map.Entry<ResourceID, List<SharedSlot>> slotEntry = iter.next();
+
+			// remove first entry to add it at the back if there are still slots left
+			iter.remove();
+
+			List<SharedSlot> slots = slotEntry.getValue();
+
+			if (!slots.isEmpty()) {
+
+				SharedSlot result = slots.remove(slots.size() - 1);
+
+				if (!slots.isEmpty()) {
+					// reinserts the entry; since it is a LinkedHashMap, we will iterate over this entry
+					// only after having polled from all other entries
+					map.put(slotEntry.getKey(), slots);
+				}
+
+				return result;
 			}
 		}
 		
@@ -642,11 +648,11 @@ public class SlotSharingGroupAssignment {
 	}
 	
 	private static void removeSlotFromAllEntries(
-			Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot)
-	{
+			Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlots,
+			SharedSlot slot) {
 		final ResourceID taskManagerId = slot.getTaskManagerID();
 		
-		for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
+		for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
 			Map<ResourceID, List<SharedSlot>> map = entry.getValue();
 
 			List<SharedSlot> list = map.get(taskManagerId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
new file mode 100644
index 0000000..dca47d3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SlotSharingGroupAssignmentTest extends TestLogger {
+
+	/**
+	 * Tests that slots are allocated in a round robin fashion from the set of available resources.
+	 */
+	@Test
+	public void testRoundRobinPolling() throws UnknownHostException {
+		final SlotSharingGroupAssignment slotSharingGroupAssignment = new SlotSharingGroupAssignment();
+		final int numberTaskManagers = 2;
+		final int numberSlots = 2;
+		final JobVertexID sourceId = new JobVertexID();
+		final JobVertexID sinkId = new JobVertexID();
+		final JobID jobId = new JobID();
+
+		for (int i = 0; i < numberTaskManagers; i++) {
+			final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
+
+			for (int j = 0; j < numberSlots; j++) {
+				final SharedSlot slot = new SharedSlot(
+					jobId,
+					mock(SlotOwner.class),
+					taskManagerLocation,
+					j,
+					mock(TaskManagerGateway.class),
+					slotSharingGroupAssignment);
+
+				slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, Locality.UNKNOWN, sourceId);
+			}
+		}
+
+		SimpleSlot allocatedSlot1 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		SimpleSlot allocatedSlot2 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+
+		assertNotEquals(allocatedSlot1.getTaskManagerLocation(), allocatedSlot2.getTaskManagerLocation());
+
+		// let's check that we can still allocate all 4 slots
+		SimpleSlot allocatedSlot3 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		assertNotNull(allocatedSlot3);
+
+		SimpleSlot allocatedSlot4 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		assertNotNull(allocatedSlot4);
+	}
+}


[2/2] flink git commit: [FLINK-8028] Let JobMaster implement RestfulGateway

Posted by tr...@apache.org.
[FLINK-8028] Let JobMaster implement RestfulGateway

This commit lets the JobMaster implement the RestfulGateway. That way,
the JobMaster can be used in combination with the existing REST handlers.

This closes #4986.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49f69098
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49f69098
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49f69098

Branch: refs/heads/master
Commit: 49f6909868c782bc37116b450dde92e02bc3731b
Parents: d9c669d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:37:06 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 16:39:32 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 35 +--------
 .../dispatcher/StandaloneDispatcher.java        |  3 +-
 .../entrypoint/JobClusterEntrypoint.java        | 11 ++-
 .../runtime/jobmaster/JobManagerRunner.java     |  9 ++-
 .../flink/runtime/jobmaster/JobMaster.java      | 78 +++++++++++++++++++-
 .../runtime/jobmaster/JobMasterException.java   | 41 ++++++++++
 .../runtime/jobmaster/JobMasterGateway.java     |  3 +-
 .../messages/webmonitor/ClusterOverview.java    | 42 +++++++++++
 .../flink/runtime/metrics/MetricRegistry.java   |  2 -
 .../minicluster/MiniClusterJobDispatcher.java   |  3 +-
 .../resourcemanager/ResourceOverview.java       |  6 ++
 .../jobmaster/JobManagerRunnerMockTest.java     |  3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  8 +-
 13 files changed, 197 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1fa0f7e..8a26f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -298,39 +298,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		return allJobsFuture.thenCombine(
 			taskManagerOverviewFuture,
-			(Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) -> {
-
-				int numberRunningOrPendingJobs = 0;
-				int numberFinishedJobs = 0;
-				int numberCancelledJobs = 0;
-				int numberFailedJobs = 0;
-
-				for (JobStatus status : allJobsStatus) {
-					switch (status) {
-						case FINISHED:
-							numberFinishedJobs++;
-							break;
-						case FAILED:
-							numberFailedJobs++;
-							break;
-						case CANCELED:
-							numberCancelledJobs++;
-							break;
-						default:
-							numberRunningOrPendingJobs++;
-							break;
-					}
-				}
-
-				return new ClusterOverview(
-					resourceOverview.getNumberTaskManagers(),
-					resourceOverview.getNumberRegisteredSlots(),
-					resourceOverview.getNumberFreeSlots(),
-					numberRunningOrPendingJobs,
-					numberFinishedJobs,
-					numberCancelledJobs,
-					numberFailedJobs);
-			});
+			(Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) ->
+				ClusterOverview.create(resourceOverview, allJobsStatus));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 3ba681c..c64c883 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -88,6 +88,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			jobManagerServices,
 			metricRegistry,
 			onCompleteActions,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			restAddress);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 124c6c6..1c8fb21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -80,7 +82,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			jobManagerServices,
 			heartbeatServices,
 			metricRegistry,
-			this);
+			this,
+			null);
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
@@ -97,7 +100,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			JobManagerServices jobManagerServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String restAddress) throws Exception {
 
 		JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -111,7 +115,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			jobManagerServices,
 			metricRegistry,
 			new TerminatingOnCompleteActions(jobGraph.getJobID()),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			restAddress);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f95b5a0..ed3d43d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -114,7 +116,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final JobManagerServices jobManagerServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception {
+			final FatalErrorHandler errorHandler,
+			@Nullable final String restAddress) throws Exception {
 
 		JobManagerMetricGroup jobManagerMetrics = null;
 
@@ -165,7 +168,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 				jobManagerMetrics,
 				this,
 				this,
-				userCodeLoader);
+				userCodeLoader,
+				restAddress,
+				metricRegistry.getMetricQueryServicePath());
 
 			this.timeout = jobManagerServices.rpcAskTimeout;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e2fb65f..c4c4445 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -71,9 +71,12 @@ import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
@@ -83,6 +86,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -107,9 +111,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -186,6 +192,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final SlotPoolGateway slotPoolGateway;
 
+	private final CompletableFuture<String> restAddressFuture;
+
+	private final String metricQueryServicePath;
+
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -215,7 +225,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
-			ClassLoader userCodeLoader) throws Exception {
+			ClassLoader userCodeLoader,
+			@Nullable String restAddress,
+			@Nullable String metricQueryServicePath) throws Exception {
 
 		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
@@ -296,6 +308,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
 
 		this.registeredTaskManagers = new HashMap<>(4);
+
+		this.restAddressFuture = Optional.ofNullable(restAddress)
+			.map(CompletableFuture::completedFuture)
+			.orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));
+
+		this.metricQueryServicePath = metricQueryServicePath;
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -758,6 +776,64 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	//----------------------------------------------------------------------------------------------
+	// RestfulGateway RPC methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<String> requestRestAddress(Time timeout) {
+		return restAddressFuture;
+	}
+
+	@Override
+	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
+		if (Objects.equals(jobGraph.getJobID(), jobId)) {
+			return requestArchivedExecutionGraph(timeout);
+		} else {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		}
+	}
+
+	@Override
+	public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
+		return requestJobDetails(timeout)
+			.thenApply(
+				jobDetails -> new MultipleJobsDetails(Collections.singleton(jobDetails)));
+	}
+
+	@Override
+	public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
+		final CompletableFuture<ResourceOverview> resourceOverviewFuture;
+		if (resourceManagerConnection != null) {
+			resourceOverviewFuture = resourceManagerConnection.getTargetGateway().requestResourceOverview(timeout);
+		} else {
+			resourceOverviewFuture = CompletableFuture.completedFuture(ResourceOverview.empty());
+		}
+
+		Collection<JobStatus> jobStatuses = Collections.singleton(executionGraph.getState());
+
+		return resourceOverviewFuture.thenApply(
+			(ResourceOverview resourceOverview) -> ClusterOverview.create(resourceOverview, jobStatuses));
+	}
+
+	@Override
+	public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+		if (metricQueryServicePath != null) {
+			return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+		} else {
+			return CompletableFuture.completedFuture(Collections.emptyList());
+		}
+	}
+
+	@Override
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		if (resourceManagerConnection != null) {
+			return resourceManagerConnection.getTargetGateway().requestTaskManagerMetricQueryServicePaths(timeout);
+		} else {
+			return CompletableFuture.completedFuture(Collections.emptyList());
+		}
+	}
+
+	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
new file mode 100644
index 0000000..df2fa75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for {@link JobMaster} related exceptions.
+ */
+public class JobMasterException extends FlinkException {
+
+	private static final long serialVersionUID = 414268039380255248L;
+
+	public JobMasterException(String message) {
+		super(message);
+	}
+
+	public JobMasterException(Throwable cause) {
+		super(cause);
+	}
+
+	public JobMasterException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index f469993..ad906c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -55,7 +56,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  * {@link JobMaster} rpc gateway interface
  */
-public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId>, RestfulGateway {
 
 	/**
 	 * Cancels the currently executed job.

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 6913484..69b45b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -18,9 +18,15 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collection;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description
  * of the Flink cluster status.
@@ -122,4 +128,40 @@ public class ClusterOverview extends JobsOverview {
 				", numJobsFailed=" + getNumJobsFailed() +
 				'}';
 	}
+
+	public static ClusterOverview create(ResourceOverview resourceOverview, Collection<JobStatus> allJobsStatus) {
+		Preconditions.checkNotNull(resourceOverview);
+		Preconditions.checkNotNull(allJobsStatus);
+
+		int numberRunningOrPendingJobs = 0;
+		int numberFinishedJobs = 0;
+		int numberCancelledJobs = 0;
+		int numberFailedJobs = 0;
+
+		for (JobStatus status : allJobsStatus) {
+			switch (status) {
+				case FINISHED:
+					numberFinishedJobs++;
+					break;
+				case FAILED:
+					numberFailedJobs++;
+					break;
+				case CANCELED:
+					numberCancelledJobs++;
+					break;
+				default:
+					numberRunningOrPendingJobs++;
+					break;
+			}
+		}
+
+		return new ClusterOverview(
+			resourceOverview.getNumberTaskManagers(),
+			resourceOverview.getNumberRegisteredSlots(),
+			resourceOverview.getNumberFreeSlots(),
+			numberRunningOrPendingJobs,
+			numberFinishedJobs,
+			numberCancelledJobs,
+			numberFailedJobs);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index e0c2667..782d66a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -27,8 +27,6 @@ import javax.annotation.Nullable;
 
 /**
  * Interface for a metric registry.
-
-				LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath);
  */
 public interface MetricRegistry {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..3a1474d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -279,7 +279,8 @@ public class MiniClusterJobDispatcher {
 					jobManagerServices,
 					metricRegistry,
 					onCompletion,
-					errorHandler);
+					errorHandler,
+					null);
 				runners[i].start();
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 1b3d5ca..ec95759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -27,6 +27,8 @@ public class ResourceOverview implements Serializable {
 
 	private static final long serialVersionUID = 7618746920569224557L;
 
+	private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = new ResourceOverview(0, 0, 0);
+
 	private final int numberTaskManagers;
 
 	private final int numberRegisteredSlots;
@@ -50,4 +52,8 @@ public class ResourceOverview implements Serializable {
 	public int getNumberFreeSlots() {
 		return numberFreeSlots;
 	}
+
+	public static ResourceOverview empty() {
+		return EMPTY_RESOURCE_OVERVIEW;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 083d6e9..f94b4be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -117,7 +117,8 @@ public class JobManagerRunnerMockTest extends TestLogger {
 			JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
 			new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,
-			jobCompletion));
+			jobCompletion,
+			null));
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index b52c08c..02047f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -117,7 +117,9 @@ public class JobMasterTest extends TestLogger {
 				null,
 				mock(OnCompletionActions.class),
 				testingFatalErrorHandler,
-				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+				null,
+				null);
 
 			CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
 
@@ -224,7 +226,9 @@ public class JobMasterTest extends TestLogger {
 				null,
 				mock(OnCompletionActions.class),
 				testingFatalErrorHandler,
-				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+				null,
+				null);
 
 			CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);