You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/07/08 08:41:11 UTC
[flink] 01/02: [FLINK-18519][REST] Send exception to client when
app fails to execute
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82e603a7d3e24d89c09f6ccdc42e4dcc07232040
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jul 6 16:53:32 2020 +0200
[FLINK-18519][REST] Send exception to client when app fails to execute
This closes #12845.
---
flink-runtime-web/pom.xml | 21 ++++++++
.../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++--
.../handlers/JarHandlerParameterTest.java | 2 +-
.../handlers/JarRunHandlerParameterTest.java | 56 ++++++++++++++++++++++
.../handlers/utils/EagerSinkProgram.java | 31 ++++++++++++
5 files changed, 118 insertions(+), 5 deletions(-)
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index bdeead9..a625e6d 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -37,6 +37,7 @@ under the License.
<properties>
<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+ <test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name>
</properties>
<dependencies>
@@ -196,6 +197,25 @@ under the License.
</execution>
<execution>
<!-- Used for JarHandler tests -->
+ <id>test-parameter-program-jar-with-eager-sink</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include>
+ </includes>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>${test.ParameterProgramWithEagerSink.name}</finalName>
+ </configuration>
+ </execution>
+ <execution>
+ <!-- Used for JarHandler tests -->
<id>test-parameter-program-jar-without-manifest</id>
<phase>process-test-classes</phase>
<goals>
@@ -279,6 +299,7 @@ under the License.
<targetDir>${project.build.directory}</targetDir>
<parameterJarName>${test.parameterProgram.name}</parameterJarName>
<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
+ <parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName>
</systemPropertyVariables>
</configuration>
</plugin>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 84a74bd..223dbae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -34,6 +33,8 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
import javax.annotation.Nonnull;
import java.nio.file.Path;
@@ -97,9 +98,13 @@ public class JarRunHandler extends
return CompletableFuture
.supplyAsync(() -> applicationRunner.run(gateway, program, effectiveConfiguration), executor)
- .thenApply(jobIds -> {
- if (jobIds.isEmpty()) {
- throw new CompletionException(new ProgramInvocationException("No jobs submitted."));
+ .handle((jobIds, throwable) -> {
+ if (throwable != null) {
+ throw new CompletionException(
+ new RestHandlerException("Could not execute application.", HttpResponseStatus.BAD_REQUEST, throwable));
+ } else if (jobIds.isEmpty()) {
+ throw new CompletionException(
+ new RestHandlerException("No jobs included in application.", HttpResponseStatus.BAD_REQUEST));
}
return new JarRunResponseBody(jobIds.get(0));
});
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
index 354be7a..4a04d51 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -259,7 +259,7 @@ public abstract class JarHandlerParameterTest
? Arrays.asList(PROG_ARGS) : null;
}
- private static <REQB extends JarRequestBody, M extends JarMessageParameters>
+ protected static <REQB extends JarRequestBody, M extends JarMessageParameters>
HandlerRequest<REQB, M> createRequest(
REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar)
throws HandlerRequestException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 13ba43c..f577090 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -23,30 +23,46 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Test;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* Tests for the parameter handling of the {@link JarRunHandler}.
@@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
private static JarRunHandler handler;
+ private static Path jarWithEagerSink;
+
@BeforeClass
public static void setup() throws Exception {
init();
@@ -65,6 +83,12 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
final Map<String, String> responseHeaders = Collections.emptyMap();
final Executor executor = TestingUtils.defaultExecutor();
+ final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+ final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar";
+ jarWithEagerSink = Files.copy(
+ jarLocation.resolve(parameterProgramWithEagerSink),
+ jarDir.resolve("program-with-eager-sink.jar"));
+
handler = new JarRunHandler(
gatewayRetriever,
timeout,
@@ -156,6 +180,38 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
return new JarRunRequestBody(null, null, null, null, jobId, null, null);
}
+ @Test
+ public void testRestHandlerExceptionThrownWithEagerSinks() throws Exception {
+ final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest(
+ getDefaultJarRequestBody(),
+ getUnresolvedJarMessageParameters(),
+ getUnresolvedJarMessageParameters(),
+ jarWithEagerSink
+ );
+
+ try {
+ handler.handleRequest(request, restfulGateway).get();
+ } catch (final ExecutionException e) {
+ final Throwable throwable = ExceptionUtils.stripCompletionException(e.getCause());
+ assertThat(throwable, instanceOf(RestHandlerException.class));
+
+ final RestHandlerException restHandlerException = (RestHandlerException) throwable;
+ assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST));
+
+ final Optional<ProgramInvocationException> invocationException =
+ ExceptionUtils.findThrowable(restHandlerException, ProgramInvocationException.class);
+
+ if (!invocationException.isPresent()) {
+ fail();
+ }
+
+ final String exceptionMsg = invocationException.get().getMessage();
+ assertThat(exceptionMsg, containsString("Job was submitted in detached mode."));
+ return;
+ }
+ fail("The test should have failed.");
+ }
+
@Override
void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws Exception {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
new file mode 100644
index 0000000..145a6a0
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Javadoc.
+ */
+public class EagerSinkProgram {
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements("hello", "world").print();
+ }
+}