You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/02 16:04:57 UTC

[2/2] flink git commit: [FLINK-8028] Let JobMaster implement RestfulGateway

[FLINK-8028] Let JobMaster implement RestfulGateway

This commit lets the JobMaster implement the RestfulGateway. That way,
the JobMaster can be used in combination with the existing REST handlers.

This closes #4986.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49f69098
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49f69098
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49f69098

Branch: refs/heads/master
Commit: 49f6909868c782bc37116b450dde92e02bc3731b
Parents: d9c669d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:37:06 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 16:39:32 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 35 +--------
 .../dispatcher/StandaloneDispatcher.java        |  3 +-
 .../entrypoint/JobClusterEntrypoint.java        | 11 ++-
 .../runtime/jobmaster/JobManagerRunner.java     |  9 ++-
 .../flink/runtime/jobmaster/JobMaster.java      | 78 +++++++++++++++++++-
 .../runtime/jobmaster/JobMasterException.java   | 41 ++++++++++
 .../runtime/jobmaster/JobMasterGateway.java     |  3 +-
 .../messages/webmonitor/ClusterOverview.java    | 42 +++++++++++
 .../flink/runtime/metrics/MetricRegistry.java   |  2 -
 .../minicluster/MiniClusterJobDispatcher.java   |  3 +-
 .../resourcemanager/ResourceOverview.java       |  6 ++
 .../jobmaster/JobManagerRunnerMockTest.java     |  3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  8 +-
 13 files changed, 197 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1fa0f7e..8a26f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -298,39 +298,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		return allJobsFuture.thenCombine(
 			taskManagerOverviewFuture,
-			(Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) -> {
-
-				int numberRunningOrPendingJobs = 0;
-				int numberFinishedJobs = 0;
-				int numberCancelledJobs = 0;
-				int numberFailedJobs = 0;
-
-				for (JobStatus status : allJobsStatus) {
-					switch (status) {
-						case FINISHED:
-							numberFinishedJobs++;
-							break;
-						case FAILED:
-							numberFailedJobs++;
-							break;
-						case CANCELED:
-							numberCancelledJobs++;
-							break;
-						default:
-							numberRunningOrPendingJobs++;
-							break;
-					}
-				}
-
-				return new ClusterOverview(
-					resourceOverview.getNumberTaskManagers(),
-					resourceOverview.getNumberRegisteredSlots(),
-					resourceOverview.getNumberFreeSlots(),
-					numberRunningOrPendingJobs,
-					numberFinishedJobs,
-					numberCancelledJobs,
-					numberFailedJobs);
-			});
+			(Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) ->
+				ClusterOverview.create(resourceOverview, allJobsStatus));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 3ba681c..c64c883 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -88,6 +88,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			jobManagerServices,
 			metricRegistry,
 			onCompleteActions,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			restAddress);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 124c6c6..1c8fb21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -80,7 +82,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			jobManagerServices,
 			heartbeatServices,
 			metricRegistry,
-			this);
+			this,
+			null);
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
@@ -97,7 +100,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			JobManagerServices jobManagerServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String restAddress) throws Exception {
 
 		JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -111,7 +115,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			jobManagerServices,
 			metricRegistry,
 			new TerminatingOnCompleteActions(jobGraph.getJobID()),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			restAddress);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f95b5a0..ed3d43d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -114,7 +116,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final JobManagerServices jobManagerServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception {
+			final FatalErrorHandler errorHandler,
+			@Nullable final String restAddress) throws Exception {
 
 		JobManagerMetricGroup jobManagerMetrics = null;
 
@@ -165,7 +168,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 				jobManagerMetrics,
 				this,
 				this,
-				userCodeLoader);
+				userCodeLoader,
+				restAddress,
+				metricRegistry.getMetricQueryServicePath());
 
 			this.timeout = jobManagerServices.rpcAskTimeout;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e2fb65f..c4c4445 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -71,9 +71,12 @@ import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
@@ -83,6 +86,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -107,9 +111,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -186,6 +192,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final SlotPoolGateway slotPoolGateway;
 
+	private final CompletableFuture<String> restAddressFuture;
+
+	private final String metricQueryServicePath;
+
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -215,7 +225,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
-			ClassLoader userCodeLoader) throws Exception {
+			ClassLoader userCodeLoader,
+			@Nullable String restAddress,
+			@Nullable String metricQueryServicePath) throws Exception {
 
 		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
@@ -296,6 +308,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
 
 		this.registeredTaskManagers = new HashMap<>(4);
+
+		this.restAddressFuture = Optional.ofNullable(restAddress)
+			.map(CompletableFuture::completedFuture)
+			.orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));
+
+		this.metricQueryServicePath = metricQueryServicePath;
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -758,6 +776,64 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	//----------------------------------------------------------------------------------------------
+	// RestfulGateway RPC methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<String> requestRestAddress(Time timeout) {
+		return restAddressFuture;
+	}
+
+	@Override
+	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
+		if (Objects.equals(jobGraph.getJobID(), jobId)) {
+			return requestArchivedExecutionGraph(timeout);
+		} else {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		}
+	}
+
+	@Override
+	public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
+		return requestJobDetails(timeout)
+			.thenApply(
+				jobDetails -> new MultipleJobsDetails(Collections.singleton(jobDetails)));
+	}
+
+	@Override
+	public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
+		final CompletableFuture<ResourceOverview> resourceOverviewFuture;
+		if (resourceManagerConnection != null) {
+			resourceOverviewFuture = resourceManagerConnection.getTargetGateway().requestResourceOverview(timeout);
+		} else {
+			resourceOverviewFuture = CompletableFuture.completedFuture(ResourceOverview.empty());
+		}
+
+		Collection<JobStatus> jobStatuses = Collections.singleton(executionGraph.getState());
+
+		return resourceOverviewFuture.thenApply(
+			(ResourceOverview resourceOverview) -> ClusterOverview.create(resourceOverview, jobStatuses));
+	}
+
+	@Override
+	public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+		if (metricQueryServicePath != null) {
+			return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+		} else {
+			return CompletableFuture.completedFuture(Collections.emptyList());
+		}
+	}
+
+	@Override
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		if (resourceManagerConnection != null) {
+			return resourceManagerConnection.getTargetGateway().requestTaskManagerMetricQueryServicePaths(timeout);
+		} else {
+			return CompletableFuture.completedFuture(Collections.emptyList());
+		}
+	}
+
+	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
new file mode 100644
index 0000000..df2fa75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for {@link JobMaster} related exceptions.
+ */
+public class JobMasterException extends FlinkException {
+
+	private static final long serialVersionUID = 414268039380255248L;
+
+	public JobMasterException(String message) {
+		super(message);
+	}
+
+	public JobMasterException(Throwable cause) {
+		super(cause);
+	}
+
+	public JobMasterException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index f469993..ad906c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -55,7 +56,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  * {@link JobMaster} rpc gateway interface
  */
-public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId>, RestfulGateway {
 
 	/**
 	 * Cancels the currently executed job.

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 6913484..69b45b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -18,9 +18,15 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collection;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description
  * of the Flink cluster status.
@@ -122,4 +128,40 @@ public class ClusterOverview extends JobsOverview {
 				", numJobsFailed=" + getNumJobsFailed() +
 				'}';
 	}
+
+	public static ClusterOverview create(ResourceOverview resourceOverview, Collection<JobStatus> allJobsStatus) {
+		Preconditions.checkNotNull(resourceOverview);
+		Preconditions.checkNotNull(allJobsStatus);
+
+		int numberRunningOrPendingJobs = 0;
+		int numberFinishedJobs = 0;
+		int numberCancelledJobs = 0;
+		int numberFailedJobs = 0;
+
+		for (JobStatus status : allJobsStatus) {
+			switch (status) {
+				case FINISHED:
+					numberFinishedJobs++;
+					break;
+				case FAILED:
+					numberFailedJobs++;
+					break;
+				case CANCELED:
+					numberCancelledJobs++;
+					break;
+				default:
+					numberRunningOrPendingJobs++;
+					break;
+			}
+		}
+
+		return new ClusterOverview(
+			resourceOverview.getNumberTaskManagers(),
+			resourceOverview.getNumberRegisteredSlots(),
+			resourceOverview.getNumberFreeSlots(),
+			numberRunningOrPendingJobs,
+			numberFinishedJobs,
+			numberCancelledJobs,
+			numberFailedJobs);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index e0c2667..782d66a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -27,8 +27,6 @@ import javax.annotation.Nullable;
 
 /**
  * Interface for a metric registry.
-
-				LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath);
  */
 public interface MetricRegistry {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..3a1474d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -279,7 +279,8 @@ public class MiniClusterJobDispatcher {
 					jobManagerServices,
 					metricRegistry,
 					onCompletion,
-					errorHandler);
+					errorHandler,
+					null);
 				runners[i].start();
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 1b3d5ca..ec95759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -27,6 +27,8 @@ public class ResourceOverview implements Serializable {
 
 	private static final long serialVersionUID = 7618746920569224557L;
 
+	private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = new ResourceOverview(0, 0, 0);
+
 	private final int numberTaskManagers;
 
 	private final int numberRegisteredSlots;
@@ -50,4 +52,8 @@ public class ResourceOverview implements Serializable {
 	public int getNumberFreeSlots() {
 		return numberFreeSlots;
 	}
+
+	public static ResourceOverview empty() {
+		return EMPTY_RESOURCE_OVERVIEW;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 083d6e9..f94b4be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -117,7 +117,8 @@ public class JobManagerRunnerMockTest extends TestLogger {
 			JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
 			new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,
-			jobCompletion));
+			jobCompletion,
+			null));
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index b52c08c..02047f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -117,7 +117,9 @@ public class JobMasterTest extends TestLogger {
 				null,
 				mock(OnCompletionActions.class),
 				testingFatalErrorHandler,
-				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+				null,
+				null);
 
 			CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
 
@@ -224,7 +226,9 @@ public class JobMasterTest extends TestLogger {
 				null,
 				mock(OnCompletionActions.class),
 				testingFatalErrorHandler,
-				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+				null,
+				null);
 
 			CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);