You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/07/08 08:41:11 UTC

[flink] 01/02: [FLINK-18519][REST] Send exception to client when app fails to execute

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82e603a7d3e24d89c09f6ccdc42e4dcc07232040
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jul 6 16:53:32 2020 +0200

    [FLINK-18519][REST] Send exception to client when app fails to execute
    
    This closes #12845.
---
 flink-runtime-web/pom.xml                          | 21 ++++++++
 .../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++--
 .../handlers/JarHandlerParameterTest.java          |  2 +-
 .../handlers/JarRunHandlerParameterTest.java       | 56 ++++++++++++++++++++++
 .../handlers/utils/EagerSinkProgram.java           | 31 ++++++++++++
 5 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index bdeead9..a625e6d 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -37,6 +37,7 @@ under the License.
 	<properties>
 		<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
 		<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+		<test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name>
 	</properties>
 
 	<dependencies>
@@ -196,6 +197,25 @@ under the License.
 					</execution>
 					<execution>
 						<!-- Used for JarHandler tests -->
+						<id>test-parameter-program-jar-with-eager-sink</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include>
+							</includes>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>${test.ParameterProgramWithEagerSink.name}</finalName>
+						</configuration>
+					</execution>
+					<execution>
+						<!-- Used for JarHandler tests -->
 						<id>test-parameter-program-jar-without-manifest</id>
 						<phase>process-test-classes</phase>
 						<goals>
@@ -279,6 +299,7 @@ under the License.
 						<targetDir>${project.build.directory}</targetDir>
 						<parameterJarName>${test.parameterProgram.name}</parameterJarName>
 						<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
+						<parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName>
 					</systemPropertyVariables>
 				</configuration>
 			</plugin>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 84a74bd..223dbae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -34,6 +33,8 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
@@ -97,9 +98,13 @@ public class JarRunHandler extends
 
 		return CompletableFuture
 				.supplyAsync(() -> applicationRunner.run(gateway, program, effectiveConfiguration), executor)
-				.thenApply(jobIds -> {
-					if (jobIds.isEmpty()) {
-						throw new CompletionException(new ProgramInvocationException("No jobs submitted."));
+				.handle((jobIds, throwable) -> {
+					if (throwable != null) {
+						throw new CompletionException(
+								new RestHandlerException("Could not execute application.", HttpResponseStatus.BAD_REQUEST, throwable));
+					} else if (jobIds.isEmpty()) {
+						throw new CompletionException(
+								new RestHandlerException("No jobs included in application.", HttpResponseStatus.BAD_REQUEST));
 					}
 					return new JarRunResponseBody(jobIds.get(0));
 				});
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
index 354be7a..4a04d51 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -259,7 +259,7 @@ public abstract class JarHandlerParameterTest
 			? Arrays.asList(PROG_ARGS) : null;
 	}
 
-	private static <REQB extends JarRequestBody, M extends JarMessageParameters>
+	protected static <REQB extends JarRequestBody, M extends JarMessageParameters>
 	HandlerRequest<REQB, M> createRequest(
 		REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar)
 		throws HandlerRequestException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 13ba43c..f577090 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -23,30 +23,46 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
@@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 
 	private static JarRunHandler handler;
 
+	private static Path jarWithEagerSink;
+
 	@BeforeClass
 	public static void setup() throws Exception {
 		init();
@@ -65,6 +83,12 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 		final Map<String, String> responseHeaders = Collections.emptyMap();
 		final Executor executor = TestingUtils.defaultExecutor();
 
+		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+		final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar";
+		jarWithEagerSink = Files.copy(
+				jarLocation.resolve(parameterProgramWithEagerSink),
+				jarDir.resolve("program-with-eager-sink.jar"));
+
 		handler = new JarRunHandler(
 			gatewayRetriever,
 			timeout,
@@ -156,6 +180,38 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 		return new JarRunRequestBody(null, null, null, null, jobId, null, null);
 	}
 
+	@Test
+	public void testRestHandlerExceptionThrownWithEagerSinks() throws Exception {
+		final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest(
+				getDefaultJarRequestBody(),
+				getUnresolvedJarMessageParameters(),
+				getUnresolvedJarMessageParameters(),
+				jarWithEagerSink
+		);
+
+		try {
+			handler.handleRequest(request, restfulGateway).get();
+		} catch (final ExecutionException e) {
+			final Throwable throwable =	 ExceptionUtils.stripCompletionException(e.getCause());
+			assertThat(throwable, instanceOf(RestHandlerException.class));
+
+			final RestHandlerException restHandlerException = (RestHandlerException) throwable;
+			assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST));
+
+			final Optional<ProgramInvocationException> invocationException =
+					ExceptionUtils.findThrowable(restHandlerException, ProgramInvocationException.class);
+
+			if (!invocationException.isPresent()) {
+				fail();
+			}
+
+			final String exceptionMsg = invocationException.get().getMessage();
+			assertThat(exceptionMsg, containsString("Job was submitted in detached mode."));
+			return;
+		}
+		fail("The test should have failed.");
+	}
+
 	@Override
 	void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
 		throws Exception {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
new file mode 100644
index 0000000..145a6a0
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Javadoc.
+ */
+public class EagerSinkProgram {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements("hello", "world").print();
+	}
+}