You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:16:05 UTC
[2/3] flink git commit: [FLINK-9289][rest] Rework JobSubmitHandler to
accept jar files
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 9c97ad4..9589b46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.rest.handler.job;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,40 +31,69 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Tests for the {@link JobSubmitHandler}.
*/
public class JobSubmitHandlerTest extends TestLogger {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ private static BlobServer blobServer;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if (blobServer != null) {
+ blobServer.close();
+ }
+ }
+
@Test
public void testSerializationFailureHandling() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
+ () -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]);
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList());
try {
handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway);
@@ -73,40 +105,61 @@ public class JobSubmitHandlerTest extends TestLogger {
@Test
public void testSuccessfulJobSubmission() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+
+ TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+ builder
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .setHostname("localhost");
+ DispatcherGateway mockGateway = builder.build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
+ () -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
- handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway)
.get();
}
@Test
public void testFailedJobSubmission() throws Exception {
final String errorMessage = "test";
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
+ DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally(new Exception(errorMessage)))
+ .build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ JobGraph jobGraph = new JobGraph("testjob");
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
try {
- handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(
+ request,
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.singletonList(jobGraphFile.toFile())), mockGateway)
.get();
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
@@ -117,4 +170,80 @@ public class JobSubmitHandlerTest extends TestLogger {
}
}
}
+
+ @Test
+ public void testRejectionOnCountMismatch() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+ builder
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .setHostname("localhost");
+ DispatcherGateway mockGateway = builder.build();
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+ CompletableFuture.completedFuture("http://localhost:1234"),
+ () -> CompletableFuture.completedFuture(mockGateway),
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList());
+
+ try {
+ handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), mockGateway)
+ .get();
+ } catch (Exception e) {
+ ExceptionUtils.findThrowable(e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count"));
+ }
+ }
+
+ @Test
+ public void testFileHandling() throws Exception {
+ CompletableFuture<JobGraph> submittedJobGraphFuture = new CompletableFuture<>();
+ DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder()
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(submittedJobGraph -> {
+ submittedJobGraphFuture.complete(submittedJobGraph);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+ CompletableFuture.completedFuture("http://localhost:1234"),
+ () -> CompletableFuture.completedFuture(dispatcherGateway),
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ final JobGraph jobGraph = new JobGraph();
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+ Collections.singletonList(jarFile.getFileName().toString()));
+
+ handler.handleRequest(new HandlerRequest<>(
+ request,
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), dispatcherGateway)
+ .get();
+
+ Assert.assertTrue("No JobGraph was submitted.", submittedJobGraphFuture.isDone());
+ final JobGraph submittedJobGraph = submittedJobGraphFuture.get();
+ Assert.assertEquals(1, submittedJobGraph.getUserJarBlobKeys().size());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
deleted file mode 100644
index 7ad72fc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-/**
- * Tests for {@link BlobServerPortResponseBody}.
- */
-public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase<BlobServerPortResponseBody> {
-
- @Override
- protected Class<BlobServerPortResponseBody> getTestResponseClass() {
- return BlobServerPortResponseBody.class;
- }
-
- @Override
- protected BlobServerPortResponseBody getTestResponseInstance() throws Exception {
- return new BlobServerPortResponseBody(64);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
index 7627d98..44f9630 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rest.messages;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import java.io.IOException;
+import java.util.Arrays;
/**
* Tests for the {@link JobSubmitRequestBody}.
@@ -35,6 +35,8 @@ public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase<Job
@Override
protected JobSubmitRequestBody getTestRequestInstance() throws IOException {
- return new JobSubmitRequestBody(new JobGraph("job"));
+ return new JobSubmitRequestBody(
+ "jobgraph",
+ Arrays.asList("jar1", "jar2"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
new file mode 100644
index 0000000..5d19a01
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link DispatcherGateway}.
+ */
+public final class TestingDispatcherGateway extends TestingRestfulGateway implements DispatcherGateway {
+
+ static final Function<JobGraph, CompletableFuture<Acknowledge>> DEFAULT_SUBMIT_FUNCTION = jobGraph -> CompletableFuture.completedFuture(Acknowledge.get());
+ static final Supplier<CompletableFuture<Collection<JobID>>> DEFAULT_LIST_FUNCTION = () -> CompletableFuture.completedFuture(Collections.emptyList());
+ static final int DEFAULT_BLOB_SERVER_PORT = 1234;
+ static final DispatcherId DEFAULT_FENCING_TOKEN = DispatcherId.generate();
+ static final Function<JobID, CompletableFuture<ArchivedExecutionGraph>> DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION = jobID -> CompletableFuture.completedFuture(null);
+
+ private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+ private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+ private int blobServerPort;
+ private DispatcherId fencingToken;
+ private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+ public TestingDispatcherGateway() {
+ super();
+ submitFunction = DEFAULT_SUBMIT_FUNCTION;
+ listFunction = DEFAULT_LIST_FUNCTION;
+ blobServerPort = DEFAULT_BLOB_SERVER_PORT;
+ fencingToken = DEFAULT_FENCING_TOKEN;
+ requestArchivedJobFunction = DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION;
+ }
+
+ public TestingDispatcherGateway(
+ String address,
+ String hostname,
+ String restAddress,
+ Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
+ Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction,
+ Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction,
+ Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
+ Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
+ Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
+ Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier,
+ Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier,
+ Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier,
+ BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
+ BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction,
+ Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction,
+ Supplier<CompletableFuture<Collection<JobID>>> listFunction,
+ int blobServerPort,
+ DispatcherId fencingToken,
+ Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction) {
+ super(
+ address,
+ hostname,
+ restAddress,
+ cancelJobFunction,
+ stopJobFunction,
+ requestJobFunction,
+ requestJobResultFunction,
+ requestJobStatusFunction,
+ requestMultipleJobDetailsSupplier,
+ requestClusterOverviewSupplier,
+ requestMetricQueryServicePathsSupplier,
+ requestTaskManagerMetricQueryServicePathsSupplier,
+ requestOperatorBackPressureStatsFunction,
+ triggerSavepointFunction);
+ this.submitFunction = submitFunction;
+ this.listFunction = listFunction;
+ this.blobServerPort = blobServerPort;
+ this.fencingToken = fencingToken;
+ this.requestArchivedJobFunction = requestArchivedJobFunction;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+ return submitFunction.apply(jobGraph);
+ }
+
+ @Override
+ public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
+ return listFunction.get();
+ }
+
+ @Override
+ public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
+ return CompletableFuture.completedFuture(blobServerPort);
+ }
+
+ @Override
+ public DispatcherId getFencingToken() {
+ return DEFAULT_FENCING_TOKEN;
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout) {
+ return requestArchivedJobFunction.apply(jobId);
+ }
+
+ /**
+ * Builder for the {@link TestingDispatcherGateway}.
+ */
+ public static final class Builder extends TestingRestfulGateway.Builder {
+
+ private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+ private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+ private int blobServerPort;
+ private DispatcherId fencingToken;
+ private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+ public Builder setSubmitFunction(Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction) {
+ this.submitFunction = submitFunction;
+ return this;
+ }
+
+ public Builder setListFunction(Supplier<CompletableFuture<Collection<JobID>>> listFunction) {
+ this.listFunction = listFunction;
+ return this;
+ }
+
+ public Builder setRequestArchivedJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+ requestArchivedJobFunction = requestJobFunction;
+ return this;
+ }
+
+ @Override
+ public TestingRestfulGateway.Builder setRequestJobFunction(Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) {
+ // signature clash
+ throw new UnsupportedOperationException("Use setRequestArchivedJobFunction() instead.");
+ }
+
+ public Builder setBlobServerPort(int blobServerPort) {
+ this.blobServerPort = blobServerPort;
+ return this;
+ }
+
+ public Builder setFencingToken(DispatcherId fencingToken) {
+ this.fencingToken = fencingToken;
+ return this;
+ }
+
+ public TestingDispatcherGateway build() {
+ return new TestingDispatcherGateway(
+ address,
+ hostname,
+ restAddress,
+ cancelJobFunction,
+ stopJobFunction,
+ requestJobFunction,
+ requestJobResultFunction,
+ requestJobStatusFunction,
+ requestMultipleJobDetailsSupplier,
+ requestClusterOverviewSupplier,
+ requestMetricQueryServicePathsSupplier,
+ requestTaskManagerMetricQueryServicePathsSupplier,
+ requestOperatorBackPressureStatsFunction,
+ triggerSavepointFunction,
+ submitFunction,
+ listFunction,
+ blobServerPort,
+ fencingToken,
+ requestArchivedJobFunction);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b92ba51..09af236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -211,21 +211,21 @@ public class TestingRestfulGateway implements RestfulGateway {
/**
* Builder for the {@link TestingRestfulGateway}.
*/
- public static final class Builder {
- private String address = LOCALHOST;
- private String hostname = LOCALHOST;
- private String restAddress = LOCALHOST;
- private Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
- private Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
- private Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
- private Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
- private Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
- private Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
- private Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
- private Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
- private Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
- private BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
- private BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
+ public static class Builder {
+ protected String address = LOCALHOST;
+ protected String hostname = LOCALHOST;
+ protected String restAddress = LOCALHOST;
+ protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
+ protected Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
+ protected Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
+ protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
+ protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
+ protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
+ protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
+ protected Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
+ protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
+ protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
+ protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
public Builder() {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;