You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/07/08 08:41:10 UTC

[flink] branch release-1.11 updated (cdd7ec9 -> 957c13c)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cdd7ec9  [hotfix] Fix version 1.11 docs base URL
     new 82e603a  [FLINK-18519][REST] Send exception to client when app fails to execute
     new 957c13c  [hotfix] Harden JarDeleteHandlerTest test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-runtime-web/pom.xml                          | 21 ++++++++
 .../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++--
 .../webmonitor/handlers/JarDeleteHandlerTest.java  |  3 ++
 .../handlers/JarHandlerParameterTest.java          |  2 +-
 .../handlers/JarRunHandlerParameterTest.java       | 56 ++++++++++++++++++++++
 .../{TestProgram.java => EagerSinkProgram.java}    |  8 ++--
 6 files changed, 93 insertions(+), 10 deletions(-)
 copy flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/{TestProgram.java => EagerSinkProgram.java} (83%)


[flink] 01/02: [FLINK-18519][REST] Send exception to client when app fails to execute

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82e603a7d3e24d89c09f6ccdc42e4dcc07232040
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jul 6 16:53:32 2020 +0200

    [FLINK-18519][REST] Send exception to client when app fails to execute
    
    This closes #12845.
---
 flink-runtime-web/pom.xml                          | 21 ++++++++
 .../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++--
 .../handlers/JarHandlerParameterTest.java          |  2 +-
 .../handlers/JarRunHandlerParameterTest.java       | 56 ++++++++++++++++++++++
 .../handlers/utils/EagerSinkProgram.java           | 31 ++++++++++++
 5 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index bdeead9..a625e6d 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -37,6 +37,7 @@ under the License.
 	<properties>
 		<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
 		<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+		<test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name>
 	</properties>
 
 	<dependencies>
@@ -196,6 +197,25 @@ under the License.
 					</execution>
 					<execution>
 						<!-- Used for JarHandler tests -->
+						<id>test-parameter-program-jar-with-eager-sink</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include>
+							</includes>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>${test.ParameterProgramWithEagerSink.name}</finalName>
+						</configuration>
+					</execution>
+					<execution>
+						<!-- Used for JarHandler tests -->
 						<id>test-parameter-program-jar-without-manifest</id>
 						<phase>process-test-classes</phase>
 						<goals>
@@ -279,6 +299,7 @@ under the License.
 						<targetDir>${project.build.directory}</targetDir>
 						<parameterJarName>${test.parameterProgram.name}</parameterJarName>
 						<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
+						<parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName>
 					</systemPropertyVariables>
 				</configuration>
 			</plugin>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 84a74bd..223dbae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -34,6 +33,8 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
@@ -97,9 +98,13 @@ public class JarRunHandler extends
 
 		return CompletableFuture
 				.supplyAsync(() -> applicationRunner.run(gateway, program, effectiveConfiguration), executor)
-				.thenApply(jobIds -> {
-					if (jobIds.isEmpty()) {
-						throw new CompletionException(new ProgramInvocationException("No jobs submitted."));
+				.handle((jobIds, throwable) -> {
+					if (throwable != null) {
+						throw new CompletionException(
+								new RestHandlerException("Could not execute application.", HttpResponseStatus.BAD_REQUEST, throwable));
+					} else if (jobIds.isEmpty()) {
+						throw new CompletionException(
+								new RestHandlerException("No jobs included in application.", HttpResponseStatus.BAD_REQUEST));
 					}
 					return new JarRunResponseBody(jobIds.get(0));
 				});
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
index 354be7a..4a04d51 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -259,7 +259,7 @@ public abstract class JarHandlerParameterTest
 			? Arrays.asList(PROG_ARGS) : null;
 	}
 
-	private static <REQB extends JarRequestBody, M extends JarMessageParameters>
+	protected static <REQB extends JarRequestBody, M extends JarMessageParameters>
 	HandlerRequest<REQB, M> createRequest(
 		REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar)
 		throws HandlerRequestException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 13ba43c..f577090 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -23,30 +23,46 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
@@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 
 	private static JarRunHandler handler;
 
+	private static Path jarWithEagerSink;
+
 	@BeforeClass
 	public static void setup() throws Exception {
 		init();
@@ -65,6 +83,12 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 		final Map<String, String> responseHeaders = Collections.emptyMap();
 		final Executor executor = TestingUtils.defaultExecutor();
 
+		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+		final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar";
+		jarWithEagerSink = Files.copy(
+				jarLocation.resolve(parameterProgramWithEagerSink),
+				jarDir.resolve("program-with-eager-sink.jar"));
+
 		handler = new JarRunHandler(
 			gatewayRetriever,
 			timeout,
@@ -156,6 +180,38 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe
 		return new JarRunRequestBody(null, null, null, null, jobId, null, null);
 	}
 
+	@Test
+	public void testRestHandlerExceptionThrownWithEagerSinks() throws Exception {
+		final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest(
+				getDefaultJarRequestBody(),
+				getUnresolvedJarMessageParameters(),
+				getUnresolvedJarMessageParameters(),
+				jarWithEagerSink
+		);
+
+		try {
+			handler.handleRequest(request, restfulGateway).get();
+		} catch (final ExecutionException e) {
+			final Throwable throwable =	 ExceptionUtils.stripCompletionException(e.getCause());
+			assertThat(throwable, instanceOf(RestHandlerException.class));
+
+			final RestHandlerException restHandlerException = (RestHandlerException) throwable;
+			assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST));
+
+			final Optional<ProgramInvocationException> invocationException =
+					ExceptionUtils.findThrowable(restHandlerException, ProgramInvocationException.class);
+
+			if (!invocationException.isPresent()) {
+				fail();
+			}
+
+			final String exceptionMsg = invocationException.get().getMessage();
+			assertThat(exceptionMsg, containsString("Job was submitted in detached mode."));
+			return;
+		}
+		fail("The test should have failed.");
+	}
+
 	@Override
 	void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
 		throws Exception {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
new file mode 100644
index 0000000..145a6a0
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Javadoc.
+ */
+public class EagerSinkProgram {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements("hello", "world").print();
+	}
+}


[flink] 02/02: [hotfix] Harden JarDeleteHandlerTest test

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 957c13c51dcd43e52f0e9a7a07e74acaae6f5073
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jul 6 20:53:35 2020 +0200

    [hotfix] Harden JarDeleteHandlerTest test
---
 .../apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
index edbeca4..82e0660 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
@@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Unit tests for {@link JarDeleteHandler}.
@@ -105,7 +106,9 @@ public class JarDeleteHandlerTest extends TestLogger {
 			final RestHandlerException restHandlerException = (RestHandlerException) throwable;
 			assertThat(restHandlerException.getMessage(), containsString("File doesnotexist.jar does not exist in"));
 			assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST));
+			return;
 		}
+		fail("The test should have failed by now.");
 	}
 
 	@Test