You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:46 UTC
[6/8] flink git commit: [FLINK-9211][REST] JarRunHandler submits job
to Dispatcher via RPC
[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC
This closes #5903.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e884a3a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e884a3a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e884a3a4
Branch: refs/heads/master
Commit: e884a3a4f6ba738fac66846488f931cf85f2e2fc
Parents: c7d5910
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 12:35:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:07 2018 +0200
----------------------------------------------------------------------
.../webmonitor/WebSubmissionExtension.java | 23 +---
.../webmonitor/handlers/JarRunHandler.java | 65 ++++++++---
.../handlers/JarRunMessageParameters.java | 12 +--
.../webmonitor/handlers/JarRunHandlerTest.java | 108 +++++++++++++++++++
.../runtime/webmonitor/WebMonitorUtils.java | 3 +-
5 files changed, 171 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index bf3bc34..991005c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -20,10 +20,8 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
@@ -53,27 +51,15 @@ public class WebSubmissionExtension implements WebMonitorExtension {
private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers;
- private final RestClusterClient<?> restClusterClient;
-
public WebSubmissionExtension(
Configuration configuration,
CompletableFuture<String> restAddressFuture,
- GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Map<String, String> responseHeaders,
Path jarDir,
Executor executor,
Time timeout) throws Exception {
- final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
- restAddressFuture.thenAccept(restAddress -> settableLeaderRetrievalService.notifyListener(
- restAddress,
- HighAvailabilityServices.DEFAULT_LEADER_ID));
-
- restClusterClient = new RestClusterClient<>(
- configuration,
- "WebSubmissionHandlers",
- settableLeaderRetrievalService);
-
webSubmissionHandlers = new ArrayList<>(5);
final JarUploadHandler jarUploadHandler = new JarUploadHandler(
@@ -102,8 +88,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
JarRunHeaders.getInstance(),
jarDir,
configuration,
- executor,
- restClusterClient);
+ executor);
final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler(
restAddressFuture,
@@ -134,8 +119,6 @@ public class WebSubmissionExtension implements WebMonitorExtension {
@Override
public CompletableFuture<Void> closeAsync() {
- restClusterClient.shutdown();
-
return CompletableFuture.completedFuture(null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 09b7a8b..2e928b0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -23,27 +23,36 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.util.ScalaUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import akka.actor.AddressFromURIString;
+
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
@@ -57,7 +66,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
* Handler to submit jobs uploaded via the Web UI.
*/
public class JarRunHandler extends
- AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+ AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
private final Path jarDir;
@@ -65,30 +74,26 @@ public class JarRunHandler extends
private final Executor executor;
- private final RestClusterClient<?> restClusterClient;
-
public JarRunHandler(
final CompletableFuture<String> localRestAddress,
- final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final Path jarDir,
final Configuration configuration,
- final Executor executor,
- final RestClusterClient<?> restClusterClient) {
+ final Executor executor) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
this.jarDir = requireNonNull(jarDir);
this.configuration = requireNonNull(configuration);
this.executor = requireNonNull(executor);
- this.restClusterClient = requireNonNull(restClusterClient);
}
@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
- @Nonnull final RestfulGateway gateway) throws RestHandlerException {
+ @Nonnull final DispatcherGateway gateway) throws RestHandlerException {
final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
final Path jarFile = jarDir.resolve(pathParameter);
@@ -105,9 +110,32 @@ public class JarRunHandler extends
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
+ jobGraph.setAllowQueuedScheduling(true);
+ return gateway.submitJob(jobGraph, timeout);
+ });
+
+ return jobSubmissionFuture
+ .thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID()))
.exceptionally(throwable -> {
throw new CompletionException(new RestHandlerException(
throwable.getMessage(),
@@ -160,4 +188,15 @@ public class JarRunHandler extends
return jobGraph;
}, executor);
}
+
+ private static String getDispatcherHost(DispatcherGateway gateway) {
+ String dispatcherAddress = gateway.getAddress();
+ final Optional<String> host = ScalaUtils.toJava(AddressFromURIString.parse(dispatcherAddress).host());
+
+ return host.orElseGet(() -> {
+ // if the dispatcher address does not contain a host part, then assume it's running
+ // on the same machine as the handler
+ return "localhost";
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
index 2d9428c..78267db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -31,17 +31,17 @@ import java.util.Collections;
*/
public class JarRunMessageParameters extends MessageParameters {
- private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+ public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
- private final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
+ public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
- private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
+ public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
- private final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
+ public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
- private final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
+ public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
- private final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
+ public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
new file mode 100644
index 0000000..aefe4f1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link JarRunHandler}.
+ */
+public class JarRunHandlerTest {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ @Test
+ public void testRunJar() throws Exception {
+ Path uploadDir = TMP.newFolder().toPath();
+
+ Path actualUploadDir = uploadDir.resolve("flink-web-upload");
+ Files.createDirectory(actualUploadDir);
+
+ Path emptyJar = actualUploadDir.resolve("empty.jar");
+ Files.createFile(emptyJar);
+
+ Configuration config = new Configuration();
+ config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
+
+ MiniClusterResource clusterResource = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ 1,
+ 1
+ ),
+ MiniClusterResource.MiniClusterType.NEW
+ );
+ clusterResource.before();
+
+ try {
+ Configuration clientConfig = clusterResource.getClientConfiguration();
+ RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor());
+
+ try {
+ JarRunHeaders headers = JarRunHeaders.getInstance();
+ JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString());
+
+ String host = clientConfig.getString(RestOptions.ADDRESS);
+ int port = clientConfig.getInteger(RestOptions.PORT);
+
+ try {
+ client.sendRequest(host, port, headers, parameters, EmptyRequestBody.getInstance())
+ .get();
+ } catch (Exception e) {
+ Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);
+ if (expected.isPresent()) {
+ // implies the job was actually submitted
+ assertTrue(expected.get().getMessage().contains("ProgramInvocationException"));
+ // implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception)
+ // implies the jar was uploaded (otherwise the file would not be found at all)
+ assertTrue(expected.get().getMessage().contains("empty.jar'. zip file is empty"));
+ } else {
+ throw e;
+ }
+ }
+ } finally {
+ client.shutdown(Time.milliseconds(10));
+ }
+ } finally {
+ clusterResource.after();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 24ecf0c..4b27534 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -219,7 +220,7 @@ public final class WebMonitorUtils {
* @throws FlinkException if the web submission extension could not be loaded
*/
public static WebMonitorExtension loadWebSubmissionExtension(
- GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
CompletableFuture<String> restAddressFuture,
Time timeout,
Map<String, String> responseHeaders,