You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:16:05 UTC

[2/3] flink git commit: [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 9c97ad4..9589b46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.rest.handler.job;
 
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,40 +31,69 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Tests for the {@link JobSubmitHandler}.
  */
 public class JobSubmitHandlerTest extends TestLogger {
 
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+	private static BlobServer blobServer;
+
+	@BeforeClass
+	public static void setup() throws IOException {
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		blobServer = new BlobServer(config, new VoidBlobStore());
+		blobServer.start();
+	}
+
+	@AfterClass
+	public static void teardown() throws IOException {
+		if (blobServer != null) {
+			blobServer.close();
+		}
+	}
+
 	@Test
 	public void testSerializationFailureHandling() throws Exception {
-		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+		DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+			.build();
 
 		JobSubmitHandler handler = new JobSubmitHandler(
 			CompletableFuture.completedFuture("http://localhost:1234"),
-			mockGatewayRetriever,
+			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
-			Collections.emptyMap());
+			Collections.emptyMap(),
+			TestingUtils.defaultExecutor());
 
-		JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]);
+		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList());
 
 		try {
 			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway);
@@ -73,40 +105,61 @@ public class JobSubmitHandlerTest extends TestLogger {
 
 	@Test
 	public void testSuccessfulJobSubmission() throws Exception {
-		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+		try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+			objectOut.writeObject(new JobGraph("testjob"));
+		}
+
+		TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+		builder
+			.setBlobServerPort(blobServer.getPort())
+			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+			.setHostname("localhost");
+		DispatcherGateway mockGateway = builder.build();
 
 		JobSubmitHandler handler = new JobSubmitHandler(
 			CompletableFuture.completedFuture("http://localhost:1234"),
-			mockGatewayRetriever,
+			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
-			Collections.emptyMap());
+			Collections.emptyMap(),
+			TestingUtils.defaultExecutor());
 
-		JobGraph job = new JobGraph("testjob");
-		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
 
-		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway)
 			.get();
 	}
 
 	@Test
 	public void testFailedJobSubmission() throws Exception {
 		final String errorMessage = "test";
-		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
+		DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+			.setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally(new Exception(errorMessage)))
+			.build();
 
 		JobSubmitHandler handler = new JobSubmitHandler(
 			CompletableFuture.completedFuture("http://localhost:1234"),
 			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
-			Collections.emptyMap());
+			Collections.emptyMap(),
+			TestingUtils.defaultExecutor());
 
-		JobGraph job = new JobGraph("testjob");
-		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+
+		JobGraph jobGraph = new JobGraph("testjob");
+		try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+			objectOut.writeObject(jobGraph);
+		}
+
+		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
 
 		try {
-			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+			handler.handleRequest(new HandlerRequest<>(
+					request,
+					EmptyMessageParameters.getInstance(),
+					Collections.emptyMap(),
+					Collections.emptyMap(),
+					Collections.singletonList(jobGraphFile.toFile())), mockGateway)
 				.get();
 		} catch (Exception e) {
 			Throwable t = ExceptionUtils.stripExecutionException(e);
@@ -117,4 +170,80 @@ public class JobSubmitHandlerTest extends TestLogger {
 			}
 		}
 	}
+
+	@Test
+	public void testRejectionOnCountMismatch() throws Exception {
+		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+		try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+			objectOut.writeObject(new JobGraph("testjob"));
+		}
+		final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath();
+
+		TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+		builder
+			.setBlobServerPort(blobServer.getPort())
+			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+			.setHostname("localhost");
+		DispatcherGateway mockGateway = builder.build();
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			() -> CompletableFuture.completedFuture(mockGateway),
+			RpcUtils.INF_TIMEOUT,
+			Collections.emptyMap(),
+			TestingUtils.defaultExecutor());
+
+		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
+
+		try {
+			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), mockGateway)
+				.get();
+		} catch (Exception e) {
+			ExceptionUtils.findThrowable(e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count"));
+		}
+	}
+
+	@Test
+	public void testFileHandling() throws Exception {
+		CompletableFuture<JobGraph> submittedJobGraphFuture = new CompletableFuture<>();
+		DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder()
+			.setBlobServerPort(blobServer.getPort())
+			.setSubmitFunction(submittedJobGraph -> {
+				submittedJobGraphFuture.complete(submittedJobGraph);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.build();
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			() -> CompletableFuture.completedFuture(dispatcherGateway),
+			RpcUtils.INF_TIMEOUT,
+			Collections.emptyMap(),
+			TestingUtils.defaultExecutor());
+
+		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+		final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
+		final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
+
+		final JobGraph jobGraph = new JobGraph();
+		try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+			objectOut.writeObject(jobGraph);
+		}
+
+		JobSubmitRequestBody request = new JobSubmitRequestBody(
+			jobGraphFile.getFileName().toString(),
+			Collections.singletonList(jarFile.getFileName().toString()));
+
+		handler.handleRequest(new HandlerRequest<>(
+				request,
+				EmptyMessageParameters.getInstance(),
+				Collections.emptyMap(),
+				Collections.emptyMap(),
+				Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), dispatcherGateway)
+			.get();
+
+		Assert.assertTrue("No JobGraph was submitted.", submittedJobGraphFuture.isDone());
+		final JobGraph submittedJobGraph = submittedJobGraphFuture.get();
+		Assert.assertEquals(1, submittedJobGraph.getUserJarBlobKeys().size());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
deleted file mode 100644
index 7ad72fc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-/**
- * Tests for {@link BlobServerPortResponseBody}.
- */
-public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase<BlobServerPortResponseBody> {
-
-	@Override
-	protected Class<BlobServerPortResponseBody> getTestResponseClass() {
-		return BlobServerPortResponseBody.class;
-	}
-
-	@Override
-	protected BlobServerPortResponseBody getTestResponseInstance() throws Exception {
-		return new BlobServerPortResponseBody(64);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
index 7627d98..44f9630 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 /**
  * Tests for the {@link JobSubmitRequestBody}.
@@ -35,6 +35,8 @@ public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase<Job
 
 	@Override
 	protected JobSubmitRequestBody getTestRequestInstance() throws IOException {
-		return new JobSubmitRequestBody(new JobGraph("job"));
+		return new JobSubmitRequestBody(
+			"jobgraph",
+			Arrays.asList("jar1", "jar2"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
new file mode 100644
index 0000000..5d19a01
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link DispatcherGateway}.
+ */
+public final class TestingDispatcherGateway extends TestingRestfulGateway implements DispatcherGateway {
+
+	static final Function<JobGraph, CompletableFuture<Acknowledge>> DEFAULT_SUBMIT_FUNCTION = jobGraph -> CompletableFuture.completedFuture(Acknowledge.get());
+	static final Supplier<CompletableFuture<Collection<JobID>>> DEFAULT_LIST_FUNCTION = () -> CompletableFuture.completedFuture(Collections.emptyList());
+	static final int DEFAULT_BLOB_SERVER_PORT = 1234;
+	static final DispatcherId DEFAULT_FENCING_TOKEN = DispatcherId.generate();
+	static final Function<JobID, CompletableFuture<ArchivedExecutionGraph>> DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION = jobID -> CompletableFuture.completedFuture(null);
+
+	private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+	private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+	private int blobServerPort;
+	private DispatcherId fencingToken;
+	private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+	public TestingDispatcherGateway() {
+		super();
+		submitFunction = DEFAULT_SUBMIT_FUNCTION;
+		listFunction = DEFAULT_LIST_FUNCTION;
+		blobServerPort = DEFAULT_BLOB_SERVER_PORT;
+		fencingToken = DEFAULT_FENCING_TOKEN;
+		requestArchivedJobFunction = DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION;
+	}
+
+	public TestingDispatcherGateway(
+			String address,
+			String hostname,
+			String restAddress,
+			Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
+			Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction,
+			Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction,
+			Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
+			Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
+			Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
+			Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier,
+			Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier,
+			Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier,
+			BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
+			BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction,
+			Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction,
+			Supplier<CompletableFuture<Collection<JobID>>> listFunction,
+			int blobServerPort,
+			DispatcherId fencingToken,
+			Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction) {
+		super(
+			address,
+			hostname,
+			restAddress,
+			cancelJobFunction,
+			stopJobFunction,
+			requestJobFunction,
+			requestJobResultFunction,
+			requestJobStatusFunction,
+			requestMultipleJobDetailsSupplier,
+			requestClusterOverviewSupplier,
+			requestMetricQueryServicePathsSupplier,
+			requestTaskManagerMetricQueryServicePathsSupplier,
+			requestOperatorBackPressureStatsFunction,
+			triggerSavepointFunction);
+		this.submitFunction = submitFunction;
+		this.listFunction = listFunction;
+		this.blobServerPort = blobServerPort;
+		this.fencingToken = fencingToken;
+		this.requestArchivedJobFunction = requestArchivedJobFunction;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+		return submitFunction.apply(jobGraph);
+	}
+
+	@Override
+	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
+		return listFunction.get();
+	}
+
+	@Override
+	public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
+		return CompletableFuture.completedFuture(blobServerPort);
+	}
+
+	@Override
+	public DispatcherId getFencingToken() {
+		return DEFAULT_FENCING_TOKEN;
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout) {
+		return requestArchivedJobFunction.apply(jobId);
+	}
+
+	/**
+	 * Builder for the {@link TestingDispatcherGateway}.
+	 */
+	public static final class Builder extends TestingRestfulGateway.Builder {
+
+		private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+		private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+		private int blobServerPort;
+		private DispatcherId fencingToken;
+		private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+		public Builder setSubmitFunction(Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction) {
+			this.submitFunction = submitFunction;
+			return this;
+		}
+
+		public Builder setListFunction(Supplier<CompletableFuture<Collection<JobID>>> listFunction) {
+			this.listFunction = listFunction;
+			return this;
+		}
+
+		public Builder setRequestArchivedJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+			requestArchivedJobFunction = requestJobFunction;
+			return this;
+		}
+
+		@Override
+		public TestingRestfulGateway.Builder setRequestJobFunction(Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) {
+			// signature clash
+			throw new UnsupportedOperationException("Use setRequestArchivedJobFunction() instead.");
+		}
+
+		public Builder setBlobServerPort(int blobServerPort) {
+			this.blobServerPort = blobServerPort;
+			return this;
+		}
+
+		public Builder setFencingToken(DispatcherId fencingToken) {
+			this.fencingToken = fencingToken;
+			return this;
+		}
+
+		public TestingDispatcherGateway build() {
+			return new TestingDispatcherGateway(
+				address,
+				hostname,
+				restAddress,
+				cancelJobFunction,
+				stopJobFunction,
+				requestJobFunction,
+				requestJobResultFunction,
+				requestJobStatusFunction,
+				requestMultipleJobDetailsSupplier,
+				requestClusterOverviewSupplier,
+				requestMetricQueryServicePathsSupplier,
+				requestTaskManagerMetricQueryServicePathsSupplier,
+				requestOperatorBackPressureStatsFunction,
+				triggerSavepointFunction,
+				submitFunction,
+				listFunction,
+				blobServerPort,
+				fencingToken,
+				requestArchivedJobFunction);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b92ba51..09af236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -211,21 +211,21 @@ public class TestingRestfulGateway implements RestfulGateway {
 	/**
 	 * Builder for the {@link TestingRestfulGateway}.
 	 */
-	public static final class Builder {
-		private String address = LOCALHOST;
-		private String hostname = LOCALHOST;
-		private String restAddress = LOCALHOST;
-		private Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
-		private Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
-		private Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
-		private Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
-		private Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
-		private Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
-		private Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
-		private Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
-		private Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
-		private BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
-		private BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
+	public static class Builder {
+		protected String address = LOCALHOST;
+		protected String hostname = LOCALHOST;
+		protected String restAddress = LOCALHOST;
+		protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
+		protected Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
+		protected Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
+		protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
+		protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
+		protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
+		protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
+		protected Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
+		protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
+		protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
+		protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
 
 		public Builder() {
 			cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;