You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:46 UTC

[6/8] flink git commit: [FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC

[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC

This closes #5903.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e884a3a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e884a3a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e884a3a4

Branch: refs/heads/master
Commit: e884a3a4f6ba738fac66846488f931cf85f2e2fc
Parents: c7d5910
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 12:35:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:07 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/WebSubmissionExtension.java      |  23 +---
 .../webmonitor/handlers/JarRunHandler.java      |  65 ++++++++---
 .../handlers/JarRunMessageParameters.java       |  12 +--
 .../webmonitor/handlers/JarRunHandlerTest.java  | 108 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |   3 +-
 5 files changed, 171 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index bf3bc34..991005c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -20,10 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
@@ -53,27 +51,15 @@ public class WebSubmissionExtension implements WebMonitorExtension {
 
 	private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers;
 
-	private final RestClusterClient<?> restClusterClient;
-
 	public WebSubmissionExtension(
 			Configuration configuration,
 			CompletableFuture<String> restAddressFuture,
-			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			Map<String, String> responseHeaders,
 			Path jarDir,
 			Executor executor,
 			Time timeout) throws Exception {
 
-		final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
-		restAddressFuture.thenAccept(restAddress -> settableLeaderRetrievalService.notifyListener(
-			restAddress,
-			HighAvailabilityServices.DEFAULT_LEADER_ID));
-
-		restClusterClient = new RestClusterClient<>(
-			configuration,
-			"WebSubmissionHandlers",
-			settableLeaderRetrievalService);
-
 		webSubmissionHandlers = new ArrayList<>(5);
 
 		final JarUploadHandler jarUploadHandler = new JarUploadHandler(
@@ -102,8 +88,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
 			JarRunHeaders.getInstance(),
 			jarDir,
 			configuration,
-			executor,
-			restClusterClient);
+			executor);
 
 		final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler(
 			restAddressFuture,
@@ -134,8 +119,6 @@ public class WebSubmissionExtension implements WebMonitorExtension {
 
 	@Override
 	public CompletableFuture<Void> closeAsync() {
-		restClusterClient.shutdown();
-
 		return CompletableFuture.completedFuture(null);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 09b7a8b..2e928b0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -23,27 +23,36 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import akka.actor.AddressFromURIString;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -57,7 +66,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
  * Handler to submit jobs uploaded via the Web UI.
  */
 public class JarRunHandler extends
-		AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+		AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
 
 	private final Path jarDir;
 
@@ -65,30 +74,26 @@ public class JarRunHandler extends
 
 	private final Executor executor;
 
-	private final RestClusterClient<?> restClusterClient;
-
 	public JarRunHandler(
 			final CompletableFuture<String> localRestAddress,
-			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> responseHeaders,
 			final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
 			final Path jarDir,
 			final Configuration configuration,
-			final Executor executor,
-			final RestClusterClient<?> restClusterClient) {
+			final Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
 
 		this.jarDir = requireNonNull(jarDir);
 		this.configuration = requireNonNull(configuration);
 		this.executor = requireNonNull(executor);
-		this.restClusterClient = requireNonNull(restClusterClient);
 	}
 
 	@Override
 	protected CompletableFuture<JarRunResponseBody> handleRequest(
 			@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
-			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
+			@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
 
 		final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
 		final Path jarFile = jarDir.resolve(pathParameter);
@@ -105,9 +110,32 @@ public class JarRunHandler extends
 			savepointRestoreSettings,
 			parallelism);
 
-		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
-			.submitJob(jobGraph)
-			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+			final List<PermanentBlobKey> keys;
+			try {
+				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+			} catch (IOException ioe) {
+				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+			}
+
+			for (PermanentBlobKey key : keys) {
+				jobGraph.addBlob(key);
+			}
+
+			return jobGraph;
+		});
+
+		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+			// we have to enable queued scheduling because slots will be allocated lazily
+			jobGraph.setAllowQueuedScheduling(true);
+			return gateway.submitJob(jobGraph, timeout);
+		});
+
+		return jobSubmissionFuture
+			.thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID()))
 			.exceptionally(throwable -> {
 				throw new CompletionException(new RestHandlerException(
 					throwable.getMessage(),
@@ -160,4 +188,15 @@ public class JarRunHandler extends
 			return jobGraph;
 		}, executor);
 	}
+
+	private static String getDispatcherHost(DispatcherGateway gateway) {
+		String dispatcherAddress = gateway.getAddress();
+		final Optional<String> host = ScalaUtils.toJava(AddressFromURIString.parse(dispatcherAddress).host());
+
+		return host.orElseGet(() -> {
+			// if the dispatcher address does not contain a host part, then assume it's running
+			// on the same machine as the handler
+			return "localhost";
+		});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
index 2d9428c..78267db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -31,17 +31,17 @@ import java.util.Collections;
  */
 public class JarRunMessageParameters extends MessageParameters {
 
-	private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+	public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
 
-	private final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
+	public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
 
-	private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
+	public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
 
-	private final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
+	public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
 
-	private final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
+	public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-	private final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
+	public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
 
 	@Override
 	public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
new file mode 100644
index 0000000..aefe4f1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link JarRunHandler}.
+ */
+public class JarRunHandlerTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	@Test
+	public void testRunJar() throws Exception {
+		Path uploadDir = TMP.newFolder().toPath();
+
+		Path actualUploadDir = uploadDir.resolve("flink-web-upload");
+		Files.createDirectory(actualUploadDir);
+
+		Path emptyJar = actualUploadDir.resolve("empty.jar");
+		Files.createFile(emptyJar);
+
+		Configuration config = new Configuration();
+		config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
+
+		MiniClusterResource clusterResource = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				1,
+				1
+			),
+			MiniClusterResource.MiniClusterType.NEW
+		);
+		clusterResource.before();
+
+		try {
+			Configuration clientConfig = clusterResource.getClientConfiguration();
+			RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor());
+
+			try {
+				JarRunHeaders headers = JarRunHeaders.getInstance();
+				JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+				parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString());
+
+				String host = clientConfig.getString(RestOptions.ADDRESS);
+				int port = clientConfig.getInteger(RestOptions.PORT);
+
+				try {
+					client.sendRequest(host, port, headers, parameters, EmptyRequestBody.getInstance())
+						.get();
+				} catch (Exception e) {
+					Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);
+					if (expected.isPresent()) {
+						// implies the job was actually submitted
+						assertTrue(expected.get().getMessage().contains("ProgramInvocationException"));
+						// implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception)
+						// implies the jar was uploaded (otherwise the file would not be found at all)
+						assertTrue(expected.get().getMessage().contains("empty.jar'. zip file is empty"));
+					} else {
+						throw e;
+					}
+				}
+			} finally {
+				client.shutdown(Time.milliseconds(10));
+			}
+		} finally {
+			clusterResource.after();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 24ecf0c..4b27534 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -219,7 +220,7 @@ public final class WebMonitorUtils {
 	 * @throws FlinkException if the web submission extension could not be loaded
 	 */
 	public static WebMonitorExtension loadWebSubmissionExtension(
-			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			CompletableFuture<String> restAddressFuture,
 			Time timeout,
 			Map<String, String> responseHeaders,