You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/21 15:30:14 UTC

[7/7] flink git commit: [FLINK-9599][rest] Implement generic mechanism to access uploaded files

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

This closes #6178.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae8cef3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae8cef3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae8cef3d

Branch: refs/heads/master
Commit: ae8cef3de2f790c4de834982751f4fc49359ee05
Parents: e6efa17
Author: zentol <ch...@apache.org>
Authored: Mon Jun 18 10:54:42 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 21 14:39:53 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/handlers/JarUploadHandler.java   |  36 +-
 .../webmonitor/handlers/JarUploadHeaders.java   |  13 +-
 .../handlers/JarUploadHandlerTest.java          |  18 +-
 flink-runtime/pom.xml                           |   7 +
 .../flink/runtime/rest/AbstractHandler.java     | 106 ++--
 .../flink/runtime/rest/FileUploadHandler.java   | 135 +++--
 .../rest/handler/AbstractRestHandler.java       |  22 +-
 .../flink/runtime/rest/handler/FileUploads.java |  98 ++++
 .../runtime/rest/handler/HandlerRequest.java    |  17 +-
 .../flink/runtime/rest/messages/FileUpload.java |  44 --
 .../messages/UntypedResponseMessageHeaders.java |   9 +
 .../flink/runtime/rest/AbstractHandlerTest.java | 193 +++++++
 .../runtime/rest/FileUploadHandlerTest.java     | 539 +++++++++++++++++++
 .../runtime/rest/RestServerEndpointITCase.java  |  38 +-
 .../runtime/rest/handler/FileUploadsTest.java   | 112 ++++
 15 files changed, 1200 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index c2f16a7..a1ef82b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -35,6 +35,7 @@ import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -46,7 +47,7 @@ import static java.util.Objects.requireNonNull;
  * Handles .jar file uploads.
  */
 public class JarUploadHandler extends
-		AbstractRestHandler<RestfulGateway, FileUpload, JarUploadResponseBody, EmptyMessageParameters> {
+		AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
 
 	private final Path jarDir;
 
@@ -57,7 +58,7 @@ public class JarUploadHandler extends
 			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> responseHeaders,
-			final MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
+			final MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
 			final Path jarDir,
 			final Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
@@ -67,41 +68,34 @@ public class JarUploadHandler extends
 
 	@Override
 	protected CompletableFuture<JarUploadResponseBody> handleRequest(
-			@Nonnull final HandlerRequest<FileUpload, EmptyMessageParameters> request,
+			@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
 			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
-
-		final FileUpload fileUpload = request.getRequestBody();
+		Collection<Path> uploadedFiles = request.getUploadedFiles();
+		if (uploadedFiles.size() != 1) {
+			throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
+		}
+		final Path fileUpload = uploadedFiles.iterator().next();
 		return CompletableFuture.supplyAsync(() -> {
-			if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
-				deleteUploadedFile(fileUpload);
+			if (!fileUpload.getFileName().toString().endsWith(".jar")) {
 				throw new CompletionException(new RestHandlerException(
 					"Only Jar files are allowed.",
 					HttpResponseStatus.BAD_REQUEST));
 			} else {
-				final Path destination = jarDir.resolve(fileUpload.getPath().getFileName());
+				final Path destination = jarDir.resolve(fileUpload.getFileName());
 				try {
-					Files.move(fileUpload.getPath(), destination);
+					Files.move(fileUpload, destination);
 				} catch (IOException e) {
-					deleteUploadedFile(fileUpload);
 					throw new CompletionException(new RestHandlerException(
 						String.format("Could not move uploaded jar file [%s] to [%s].",
-							fileUpload.getPath(),
+							fileUpload,
 							destination),
 						HttpResponseStatus.INTERNAL_SERVER_ERROR,
 						e));
 				}
-				return new JarUploadResponseBody(fileUpload.getPath()
+				return new JarUploadResponseBody(fileUpload
 					.normalize()
 					.toString());
 			}
 		}, executor);
 	}
-
-	private void deleteUploadedFile(final FileUpload fileUpload) {
-		try {
-			Files.delete(fileUpload.getPath());
-		} catch (IOException e) {
-			log.error("Failed to delete file {}.", fileUpload.getPath(), e);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
index 9408dba..d969b4fc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -28,7 +28,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /**
  * {@link MessageHeaders} for uploading jars.
  */
-public final class JarUploadHeaders implements MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> {
+public final class JarUploadHeaders implements MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
 
 	public static final String URL = "/jars/upload";
 	private static final JarUploadHeaders INSTANCE = new JarUploadHeaders();
@@ -46,8 +46,8 @@ public final class JarUploadHeaders implements MessageHeaders<FileUpload, JarUpl
 	}
 
 	@Override
-	public Class<FileUpload> getRequestClass() {
-		return FileUpload.class;
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
 	}
 
 	@Override
@@ -75,4 +75,9 @@ public final class JarUploadHeaders implements MessageHeaders<FileUpload, JarUpl
 			" header is set to \"application/x-java-archive\", as some http libraries do not add the header by default.\n" +
 			"Using 'curl' you can upload a jar via 'curl -X POST -H \"Expect:\" -F \"jarfile=#path/to/flink-job.jar\" http://hostname:port" + URL + "'.";
 	}
+
+	@Override
+	public boolean acceptsFileUploads() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 2ce75e7..812d4c6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
@@ -83,7 +84,7 @@ public class JarUploadHandlerTest extends TestLogger {
 	@Test
 	public void testRejectNonJarFiles() throws Exception {
 		final Path uploadedFile = Files.createFile(jarDir.resolve("katrin.png"));
-		final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
+		final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);
 
 		try {
 			jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
@@ -99,7 +100,7 @@ public class JarUploadHandlerTest extends TestLogger {
 	@Test
 	public void testUploadJar() throws Exception {
 		final Path uploadedFile = Files.createFile(jarDir.resolve("Kafka010Example.jar"));
-		final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
+		final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);
 
 		final JarUploadResponseBody jarUploadResponseBody = jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
 		assertThat(jarUploadResponseBody.getStatus(), equalTo(JarUploadResponseBody.UploadStatus.success));
@@ -109,7 +110,7 @@ public class JarUploadHandlerTest extends TestLogger {
 	@Test
 	public void testFailedUpload() throws Exception {
 		final Path uploadedFile = jarDir.resolve("Kafka010Example.jar");
-		final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
+		final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);
 
 		try {
 			jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
@@ -123,12 +124,13 @@ public class JarUploadHandlerTest extends TestLogger {
 		}
 	}
 
-	private static HandlerRequest<FileUpload, EmptyMessageParameters> createRequest(
-			final Path uploadedFile) throws HandlerRequestException {
+	private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> createRequest(
+			final Path uploadedFile) throws HandlerRequestException, IOException {
 		return new HandlerRequest<>(
-			new FileUpload(uploadedFile),
+			EmptyRequestBody.getInstance(),
 			EmptyMessageParameters.getInstance(),
 			Collections.emptyMap(),
-			Collections.emptyMap());
+			Collections.emptyMap(),
+			Collections.singleton(uploadedFile));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f195bd5..9a4034f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -291,6 +291,13 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp</artifactId>
+			<version>3.7.0</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>com.typesafe.akka</groupId>
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 7246b7f..1e88425 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
@@ -26,7 +27,6 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-import org.apache.flink.runtime.rest.messages.FileUpload;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
@@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -103,77 +102,78 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				return;
 			}
 
-			ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
+			final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
 
-			R request;
-			if (isFileUpload()) {
-				final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-				if (path == null) {
+			try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+				if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
 					HandlerUtils.sendErrorResponse(
 						ctx,
 						httpRequest,
-						new ErrorResponseBody("Client did not upload a file."),
+						new ErrorResponseBody("File uploads not allowed."),
 						HttpResponseStatus.BAD_REQUEST,
 						responseHeaders);
 					return;
 				}
-				//noinspection unchecked
-				request = (R) new FileUpload(path);
-			} else if (msgContent.capacity() == 0) {
-				try {
-					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
-				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Request did not conform to expected format.", je);
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody("Bad request received."),
-						HttpResponseStatus.BAD_REQUEST,
-						responseHeaders);
-					return;
+
+				R request;
+				if (msgContent.capacity() == 0) {
+					try {
+						request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+					} catch (JsonParseException | JsonMappingException je) {
+						log.error("Request did not conform to expected format.", je);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							httpRequest,
+							new ErrorResponseBody("Bad request received."),
+							HttpResponseStatus.BAD_REQUEST,
+							responseHeaders);
+						return;
+					}
+				} else {
+					try {
+						ByteBufInputStream in = new ByteBufInputStream(msgContent);
+						request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
+					} catch (JsonParseException | JsonMappingException je) {
+						log.error("Failed to read request.", je);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							httpRequest,
+							new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
+							HttpResponseStatus.BAD_REQUEST,
+							responseHeaders);
+						return;
+					}
 				}
-			} else {
+
+				final HandlerRequest<R, M> handlerRequest;
+
 				try {
-					ByteBufInputStream in = new ByteBufInputStream(msgContent);
-					request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
-				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Failed to read request.", je);
+					handlerRequest = new HandlerRequest<R, M>(
+						request,
+						untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+						routedRequest.getRouteResult().pathParams(),
+						routedRequest.getRouteResult().queryParams(),
+						uploadedFiles.getUploadedFiles());
+				} catch (HandlerRequestException hre) {
+					log.error("Could not create the handler request.", hre);
+
 					HandlerUtils.sendErrorResponse(
 						ctx,
 						httpRequest,
-						new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
+						new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
 						HttpResponseStatus.BAD_REQUEST,
 						responseHeaders);
 					return;
 				}
-			}
-
-			final HandlerRequest<R, M> handlerRequest;
-
-			try {
-				handlerRequest = new HandlerRequest<>(
-					request,
-					untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
-					routedRequest.getRouteResult().pathParams(),
-					routedRequest.getRouteResult().queryParams());
-			} catch (HandlerRequestException hre) {
-				log.error("Could not create the handler request.", hre);
 
-				HandlerUtils.sendErrorResponse(
+				respondToRequest(
 					ctx,
 					httpRequest,
-					new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
-					HttpResponseStatus.BAD_REQUEST,
-					responseHeaders);
-				return;
+					handlerRequest,
+					gateway);
 			}
 
-			respondToRequest(
-				ctx,
-				httpRequest,
-				handlerRequest,
-				gateway);
-
 		} catch (Throwable e) {
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(
@@ -185,10 +185,6 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 		}
 	}
 
-	private boolean isFileUpload() {
-		return untypedResponseMessageHeaders.getRequestClass() == FileUpload.class;
-	}
-
 	/**
 	 * Respond to the given {@link HandlerRequest}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 2f38e65..1a5760f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
@@ -26,7 +32,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
@@ -37,8 +45,13 @@ import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Optional;
 import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
@@ -52,7 +65,9 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class);
 
-	static final AttributeKey<Path> UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE");
+	public static final String HTTP_ATTRIBUTE_REQUEST = "request";
+
+	private static final AttributeKey<FileUploads> UPLOADED_FILES = AttributeKey.valueOf("UPLOADED_FILES");
 
 	private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true);
 
@@ -61,6 +76,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 	private HttpPostRequestDecoder currentHttpPostRequestDecoder;
 
 	private HttpRequest currentHttpRequest;
+	private byte[] currentJsonPayload;
+	private Path currentUploadDir;
 
 	public FileUploadHandler(final Path uploadDir) {
 		super(false);
@@ -70,51 +87,103 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 	@Override
 	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
-		if (msg instanceof HttpRequest) {
-			final HttpRequest httpRequest = (HttpRequest) msg;
-			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
-					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-					currentHttpRequest = httpRequest;
+		try {
+			if (msg instanceof HttpRequest) {
+				final HttpRequest httpRequest = (HttpRequest) msg;
+				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
+				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
+						currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+						currentHttpRequest = httpRequest;
+						currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+					} else {
+						ctx.fireChannelRead(msg);
+					}
 				} else {
 					ctx.fireChannelRead(msg);
 				}
+			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
+				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
+				RestServerEndpoint.createUploadDir(uploadDir, LOG);
+
+				final HttpContent httpContent = (HttpContent) msg;
+				currentHttpPostRequestDecoder.offer(httpContent);
+
+				while (currentHttpPostRequestDecoder.hasNext()) {
+					final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
+					if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
+						final DiskFileUpload fileUpload = (DiskFileUpload) data;
+						checkState(fileUpload.isCompleted());
+
+						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
+						fileUpload.renameTo(dest.toFile());
+					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
+						final Attribute request = (Attribute) data;
+						// this could also be implemented by using the first found Attribute as the payload
+						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+							currentJsonPayload = request.get();
+						} else {
+							handleError(ctx, "Received unknown attribute " + data.getName() + '.', HttpResponseStatus.BAD_REQUEST, null);
+							return;
+						}
+					}
+				}
+
+				if (httpContent instanceof LastHttpContent) {
+					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
+					ctx.fireChannelRead(currentHttpRequest);
+					if (currentJsonPayload != null) {
+						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
+					} else {
+						ctx.fireChannelRead(httpContent);
+					}
+					reset();
+				}
 			} else {
 				ctx.fireChannelRead(msg);
 			}
-		} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
-			// make sure that we still have a upload dir in case that it got deleted in the meanwhile
-			RestServerEndpoint.createUploadDir(uploadDir, LOG);
-
-			final HttpContent httpContent = (HttpContent) msg;
-			currentHttpPostRequestDecoder.offer(httpContent);
-
-			while (currentHttpPostRequestDecoder.hasNext()) {
-				final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
-				if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
-					final DiskFileUpload fileUpload = (DiskFileUpload) data;
-					checkState(fileUpload.isCompleted());
-
-					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
-						"_" + fileUpload.getFilename()));
-					fileUpload.renameTo(dest.toFile());
-					ctx.channel().attr(UPLOADED_FILE).set(dest);
-				}
-			}
+		} catch (Exception e) {
+			handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+		}
+	}
 
-			if (httpContent instanceof LastHttpContent) {
-				ctx.fireChannelRead(currentHttpRequest);
-				ctx.fireChannelRead(httpContent);
-				reset();
+	private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+		HttpRequest tmpRequest = currentHttpRequest;
+		deleteUploadedFiles();
+		reset();
+		LOG.warn(errorMessage, e);
+		HandlerUtils.sendErrorResponse(
+			ctx,
+			tmpRequest,
+			new ErrorResponseBody(errorMessage),
+			responseStatus,
+			Collections.emptyMap()
+		);
+	}
+
+	private void deleteUploadedFiles() {
+		if (currentUploadDir != null) {
+			try {
+				FileUtils.deleteDirectory(currentUploadDir.toFile());
+			} catch (IOException e) {
+				LOG.warn("Could not cleanup uploaded files.", e);
 			}
-		} else {
-			ctx.fireChannelRead(msg);
 		}
 	}
 
 	private void reset() {
+		// destroy() can fail because some data is stored multiple times in the decoder causing an IllegalReferenceCountException
+		// see https://github.com/netty/netty/issues/7814
+		currentHttpPostRequestDecoder.getBodyHttpDatas().clear();
 		currentHttpPostRequestDecoder.destroy();
 		currentHttpPostRequestDecoder = null;
 		currentHttpRequest = null;
+		currentUploadDir = null;
+		currentJsonPayload = null;
+	}
+
+	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
+		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+			.orElse(FileUploads.EMPTY);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index f5a74c3..448711b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 	}
 
 	@Override
-	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) throws RestHandlerException {
+	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
 		CompletableFuture<P> response;
 
 		try {
@@ -87,14 +87,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 				if (error instanceof RestHandlerException) {
 					final RestHandlerException rhe = (RestHandlerException) error;
 
-					log.error("Exception occurred in REST handler.", error);
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody(rhe.getMessage()),
-						rhe.getHttpResponseStatus(),
-						responseHeaders);
+					processRestHandlerException(ctx, httpRequest, rhe);
 				} else {
 					log.error("Implementation error: Unhandled exception.", error);
 					HandlerUtils.sendErrorResponse(
@@ -115,6 +108,17 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 		});
 	}
 
+	private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) {
+		log.error("Exception occurred in REST handler.", rhe);
+
+		HandlerUtils.sendErrorResponse(
+			ctx,
+			httpRequest,
+			new ErrorResponseBody(rhe.getMessage()),
+			rhe.getHttpResponseStatus(),
+			responseHeaders);
+	}
+
 	/**
 	 * This method is called for every incoming request and returns a {@link CompletableFuture} containing a the response.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
new file mode 100644
index 0000000..31ac47bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+	@Nullable
+	private final Path uploadDirectory;
+
+	public static final FileUploads EMPTY = new FileUploads();
+
+	private FileUploads() {
+		this.uploadDirectory = null;
+	}
+
+	public FileUploads(@Nonnull Path uploadDirectory) {
+		Preconditions.checkNotNull(uploadDirectory, "UploadDirectory must not be null.");
+		Preconditions.checkArgument(Files.exists(uploadDirectory), "UploadDirectory does not exist.");
+		Preconditions.checkArgument(Files.isDirectory(uploadDirectory), "UploadDirectory is not a directory.");
+		Preconditions.checkArgument(uploadDirectory.isAbsolute(), "UploadDirectory is not absolute.");
+		this.uploadDirectory = uploadDirectory;
+	}
+
+	public Collection<Path> getUploadedFiles() throws IOException {
+		if (uploadDirectory == null) {
+			return Collections.emptyList();
+		}
+
+		FileAdderVisitor visitor = new FileAdderVisitor();
+		Files.walkFileTree(uploadDirectory, visitor);
+
+		return Collections.unmodifiableCollection(visitor.getContainedFiles());
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (uploadDirectory != null) {
+			FileUtils.deleteDirectory(uploadDirectory.toFile());
+		}
+	}
+
+	private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
+
+		private final Collection<Path> files = new ArrayList<>(4);
+
+		Collection<Path> getContainedFiles() {
+			return files;
+		}
+
+		FileAdderVisitor() {
+		}
+
+		@Override
+		public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+			FileVisitResult result = super.visitFile(file, attrs);
+			files.add(file);
+			return result;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index aacf0a2..7e93556 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -24,6 +24,10 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
+import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -39,15 +43,21 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
 
 	private final R requestBody;
+	private final Collection<Path> uploadedFiles;
 	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
 	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
 	public HandlerRequest(R requestBody, M messageParameters) throws HandlerRequestException {
-		this(requestBody, messageParameters, Collections.emptyMap(), Collections.emptyMap());
+		this(requestBody, messageParameters, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList());
 	}
 
 	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) throws HandlerRequestException {
+		this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList());
+	}
+
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
+		this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
 		Preconditions.checkNotNull(messageParameters);
 		Preconditions.checkNotNull(receivedQueryParameters);
 		Preconditions.checkNotNull(receivedPathParameters);
@@ -129,4 +139,9 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 			return queryParameter.getValue();
 		}
 	}
+
+	@Nonnull
+	public Collection<Path> getUploadedFiles() {
+		return uploadedFiles;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FileUpload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FileUpload.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FileUpload.java
deleted file mode 100644
index fb09dce..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FileUpload.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import java.nio.file.Path;
-
-/**
- * Client uploading a file.
- */
-public class FileUpload implements RequestBody {
-
-	private final Path path;
-
-	public FileUpload(final Path path) {
-		this.path = path;
-	}
-
-	public Path getPath() {
-		return path;
-	}
-
-	@Override
-	public String toString() {
-		return "FileUpload{" +
-			"path=" + path +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
index 1556b90..3de9626 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
@@ -40,4 +40,13 @@ public interface UntypedResponseMessageHeaders<R extends RequestBody, M extends
 	 * @return new message parameters object
 	 */
 	M getUnresolvedMessageParameters();
+
+	/**
+	 * Returns whether this header allows file uploads.
+	 *
+	 * @return whether this header allows file uploads
+	 */
+	default boolean acceptsFileUploads() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
new file mode 100644
index 0000000..91fba68
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testFileCleanup() throws Exception {
+		final Path dir = temporaryFolder.newFolder().toPath();
+		final Path file = dir.resolve("file");
+		Files.createFile(file);
+
+		final String restAddress = "http://localhost:1234";
+		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
+			.setRestAddress(restAddress)
+			.build();
+
+		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
+			CompletableFuture.completedFuture(mockRestfulGateway);
+
+		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+
+		RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
+		HttpRequest request = new DefaultFullHttpRequest(
+			HttpVersion.HTTP_1_1,
+			HttpMethod.GET,
+			TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+			Unpooled.wrappedBuffer(new byte[0]));
+		RoutedRequest<?> routerRequest = new RoutedRequest<>(routeResult, request);
+
+		Attribute<FileUploads> attribute = new SimpleAttribute();
+		attribute.set(new FileUploads(dir));
+		Channel channel = mock(Channel.class);
+		when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
+
+		ChannelHandlerContext context = mock(ChannelHandlerContext.class);
+		when(context.channel()).thenReturn(channel);
+
+		handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
+
+		Assert.assertFalse(Files.exists(file));
+	}
+
+	private static class SimpleAttribute implements Attribute<FileUploads> {
+
+		private static final AttributeKey<FileUploads> KEY = AttributeKey.valueOf("test");
+
+		private final AtomicReference<FileUploads> container = new AtomicReference<>();
+
+		@Override
+		public AttributeKey<FileUploads> key() {
+			return KEY;
+		}
+
+		@Override
+		public FileUploads get() {
+			return container.get();
+		}
+
+		@Override
+		public void set(FileUploads value) {
+			container.set(value);
+		}
+
+		@Override
+		public FileUploads getAndSet(FileUploads value) {
+			return container.getAndSet(value);
+		}
+
+		@Override
+		public FileUploads setIfAbsent(FileUploads value) {
+			if (container.compareAndSet(null, value)) {
+				return value;
+			} else {
+				return container.get();
+			}
+		}
+
+		@Override
+		public FileUploads getAndRemove() {
+			return container.getAndSet(null);
+		}
+
+		@Override
+		public boolean compareAndSet(FileUploads oldValue, FileUploads newValue) {
+			return container.compareAndSet(oldValue, newValue);
+		}
+
+		@Override
+		public void remove() {
+			set(null);
+		}
+	}
+
+	private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+
+		protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
+			super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+		}
+
+		@Override
+		protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+
+		}
+
+		private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
+			INSTANCE;
+
+			@Override
+			public Class<EmptyRequestBody> getRequestClass() {
+				return EmptyRequestBody.class;
+			}
+
+			@Override
+			public EmptyMessageParameters getUnresolvedMessageParameters() {
+				return EmptyMessageParameters.getInstance();
+			}
+
+			@Override
+			public HttpMethodWrapper getHttpMethod() {
+				return HttpMethodWrapper.POST;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test";
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
new file mode 100644
index 0000000..dd9a739
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
+	private static final Random RANDOM = new Random();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static RestServerEndpoint serverEndpoint;
+	private static String serverAddress;
+
+	private static MultipartMixedHandler mixedHandler;
+	private static MultipartJsonHandler jsonHandler;
+	private static MultipartFileHandler fileHandler;
+	private static File file1;
+	private static File file2;
+
+	private static Path configuredUploadDir;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(RestOptions.PORT, 0);
+		config.setString(RestOptions.ADDRESS, "localhost");
+		configuredUploadDir = TEMPORARY_FOLDER.newFolder().toPath();
+		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
+
+		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		final String restAddress = "http://localhost:1234";
+		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
+			.setRestAddress(restAddress)
+			.build();
+
+		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
+			CompletableFuture.completedFuture(mockRestfulGateway);
+
+		file1 = TEMPORARY_FOLDER.newFile();
+		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		file2 = TEMPORARY_FOLDER.newFile();
+		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+
+		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
+			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
+			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
+			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
+
+		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
+
+		serverEndpoint.start();
+		serverAddress = serverEndpoint.getRestBaseUrl();
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		if (serverEndpoint != null) {
+			serverEndpoint.close();
+			serverEndpoint = null;
+		}
+	}
+
+	private static Request buildMalformedRequest(String headerUrl) {
+		MultipartBody.Builder builder = new MultipartBody.Builder();
+		builder = addFilePart(builder);
+		// this causes a failure in the FileUploadHandler since the request should only contain form-data
+		builder = builder.addPart(okhttp3.RequestBody.create(MediaType.parse("text/plain"), "crash"));
+		return finalizeRequest(builder, headerUrl);
+	}
+
+	private static Request buildMixedRequestWithUnknownAttribute(String headerUrl) throws IOException {
+		MultipartBody.Builder builder = new MultipartBody.Builder();
+		builder = addJsonPart(builder, RANDOM.nextInt(), "hello");
+		builder = addFilePart(builder);
+		return finalizeRequest(builder, headerUrl);
+	}
+
+	private static Request buildFileRequest(String headerUrl) {
+		MultipartBody.Builder builder = new MultipartBody.Builder();
+		builder = addFilePart(builder);
+		return finalizeRequest(builder, headerUrl);
+	}
+
+	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
+		MultipartBody.Builder builder = new MultipartBody.Builder();
+		builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+		return finalizeRequest(builder, headerUrl);
+	}
+
+	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
+		MultipartBody.Builder builder = new MultipartBody.Builder();
+		builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+		builder = addFilePart(builder);
+		return finalizeRequest(builder, headerUrl);
+	}
+
+	private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) {
+		MultipartBody multipartBody = builder
+			.setType(MultipartBody.FORM)
+			.build();
+
+		return new Request.Builder()
+			.url(serverAddress + headerUrl)
+			.post(multipartBody)
+			.build();
+	}
+
+	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
+		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
+		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
+
+		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
+			.addFormDataPart("file2", file2.getName(), filePayload2);
+	}
+
+	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index, String attribute) throws IOException {
+		TestRequestBody jsonRequestBody = new TestRequestBody(index);
+
+		StringWriter sw = new StringWriter();
+		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
+
+		String jsonPayload = sw.toString();
+
+		return builder.addFormDataPart(attribute, jsonPayload);
+	}
+
+	@Test
+	public void testMixedMultipart() throws Exception {
+		OkHttpClient client = new OkHttpClient();
+
+		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		try (Response response = client.newCall(jsonRequest).execute()) {
+			// explicitly rejected by the test handler implementation
+			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
+		}
+
+		Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(fileRequest).execute()) {
+			// expected JSON payload is missing
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+
+		int mixedId = RANDOM.nextInt();
+		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
+		try (Response response = client.newCall(mixedRequest).execute()) {
+			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
+			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
+		}
+	}
+
+	@Test
+	public void testJsonMultipart() throws Exception {
+		OkHttpClient client = new OkHttpClient();
+
+		int jsonId = RANDOM.nextInt();
+		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
+		try (Response response = client.newCall(jsonRequest).execute()) {
+			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
+			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
+		}
+
+		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(fileRequest).execute()) {
+			// either because JSON payload is missing or FileUploads are outright forbidden
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+
+		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		try (Response response = client.newCall(mixedRequest).execute()) {
+			// FileUploads are outright forbidden
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+	}
+
+	@Test
+	public void testFileMultipart() throws Exception {
+		OkHttpClient client = new OkHttpClient();
+
+		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		try (Response response = client.newCall(jsonRequest).execute()) {
+			// JSON payload did not match expected format
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+
+		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(fileRequest).execute()) {
+			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
+		}
+
+		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		try (Response response = client.newCall(mixedRequest).execute()) {
+			// JSON payload did not match expected format
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+	}
+
+	@Test
+	public void testUploadCleanupOnUnknownAttribute() throws IOException {
+		OkHttpClient client = new OkHttpClient();
+
+		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(request).execute()) {
+			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
+		}
+		assertUploadDirectoryIsEmpty();
+	}
+
+	/**
+	 * Crashes the handler be submitting a malformed multipart request and tests that the upload directory is cleaned up.
+	 */
+	@Test
+	public void testUploadCleanupOnFailure() throws IOException {
+		OkHttpClient client = new OkHttpClient();
+
+		Request request = buildMalformedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(request).execute()) {
+			// decoding errors aren't handled separately by the FileUploadHandler
+			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
+		}
+		assertUploadDirectoryIsEmpty();
+	}
+
+	private static void assertUploadDirectoryIsEmpty() throws IOException {
+		Preconditions.checkArgument(
+			1 == Files.list(configuredUploadDir).count(),
+			"Directory structure in rest upload directory has changed. Test must be adjusted");
+		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
+		Preconditions.checkArgument(
+			actualUploadDir.isPresent(),
+			"Expected upload directory does not exist.");
+		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
+	}
+
+	private static class MultipartMixedHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		volatile TestRequestBody lastReceivedRequest = null;
+
+		MultipartMixedHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartMixedHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			MultipartFileHandler.verifyFileUpload(request.getUploadedFiles());
+			this.lastReceivedRequest = request.getRequestBody();
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		private static final class MultipartMixedHeaders implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+			private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
+
+			private MultipartMixedHeaders() {
+			}
+
+			@Override
+			public Class<TestRequestBody> getRequestClass() {
+				return TestRequestBody.class;
+			}
+
+			@Override
+			public Class<EmptyResponseBody> getResponseClass() {
+				return EmptyResponseBody.class;
+			}
+
+			@Override
+			public HttpResponseStatus getResponseStatusCode() {
+				return HttpResponseStatus.OK;
+			}
+
+			@Override
+			public String getDescription() {
+				return "";
+			}
+
+			@Override
+			public EmptyMessageParameters getUnresolvedMessageParameters() {
+				return EmptyMessageParameters.getInstance();
+			}
+
+			@Override
+			public HttpMethodWrapper getHttpMethod() {
+				return HttpMethodWrapper.POST;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/mixed";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
+		}
+	}
+
+	private static class MultipartJsonHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		volatile TestRequestBody lastReceivedRequest = null;
+
+		MultipartJsonHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartJsonHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			Collection<Path> uploadedFiles = request.getUploadedFiles();
+			if (!uploadedFiles.isEmpty()) {
+				throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+			}
+			this.lastReceivedRequest = request.getRequestBody();
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		private static final class MultipartJsonHeaders extends TestHeadersBase<TestRequestBody> {
+			private static final MultipartJsonHeaders INSTANCE = new MultipartJsonHeaders();
+
+			private MultipartJsonHeaders() {
+			}
+
+			@Override
+			public Class<TestRequestBody> getRequestClass() {
+				return TestRequestBody.class;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/json";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return false;
+			}
+		}
+	}
+
+	private static class MultipartFileHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		MultipartFileHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartFileHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			verifyFileUpload(request.getUploadedFiles());
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		static void verifyFileUpload(Collection<Path> uploadedFiles) throws RestHandlerException {
+			try {
+				assertEquals(2, uploadedFiles.size());
+
+				for (Path uploadedFile : uploadedFiles) {
+					File matchingFile;
+					if (uploadedFile.getFileName().toString().equals(file1.getName())) {
+						matchingFile = file1;
+					} else if (uploadedFile.getFileName().toString().equals(file2.getName())) {
+						matchingFile = file2;
+					} else {
+						throw new RestHandlerException("Received file with unknown name " + uploadedFile.getFileName() + '.', HttpResponseStatus.INTERNAL_SERVER_ERROR);
+					}
+
+					byte[] originalContent = Files.readAllBytes(matchingFile.toPath());
+					byte[] receivedContent = Files.readAllBytes(uploadedFile);
+					assertArrayEquals(originalContent, receivedContent);
+				}
+			} catch (Exception e) {
+				// return 505 to differentiate from common BAD_REQUEST responses in this test
+				throw new RestHandlerException("Test verification failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+			}
+		}
+
+		private static final class MultipartFileHeaders extends TestHeadersBase<EmptyRequestBody> {
+			private static final MultipartFileHeaders INSTANCE = new MultipartFileHeaders();
+
+			private MultipartFileHeaders() {
+			}
+
+			@Override
+			public Class<EmptyRequestBody> getRequestClass() {
+				return EmptyRequestBody.class;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/file";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
+		}
+	}
+
+	private abstract static class TestHeadersBase<R extends RequestBody> implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
+
+		@Override
+		public Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public String getDescription() {
+			return "";
+		}
+
+		@Override
+		public EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.POST;
+		}
+	}
+
+	private static final class TestRequestBody implements RequestBody {
+		private static final String FIELD_NAME_INDEX = "index";
+
+		@JsonProperty(FIELD_NAME_INDEX)
+		private final int index;
+
+		@JsonCreator
+		TestRequestBody(@JsonProperty(FIELD_NAME_INDEX) int index) {
+			this.index = index;
+		}
+	}
+
+	private static class TestRestServerEndpoint extends RestServerEndpoint {
+
+		private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
+
+		TestRestServerEndpoint(
+			RestServerEndpointConfiguration configuration,
+			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
+			super(configuration);
+			this.handlers = Preconditions.checkNotNull(handlers);
+		}
+
+		@Override
+		protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+			return handlers;
+		}
+
+		@Override
+		protected void startInternal() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index dad3b4f..b9413ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.FileUpload;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
@@ -367,8 +367,8 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 
 		assertEquals(200, connection.getResponseCode());
-		final Path lastUploadedPath = testUploadHandler.getLastUploadedPath();
-		assertEquals(uploadedContent, new String(Files.readAllBytes(lastUploadedPath), StandardCharsets.UTF_8));
+		final byte[] lastUploadedFileContents = testUploadHandler.getLastUploadedFileContents();
+		assertEquals(uploadedContent, new String(lastUploadedFileContents, StandardCharsets.UTF_8));
 	}
 
 	/**
@@ -648,9 +648,9 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 	}
 
-	private static class TestUploadHandler extends AbstractRestHandler<RestfulGateway, FileUpload, EmptyResponseBody, EmptyMessageParameters> {
+	private static class TestUploadHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
-		private volatile Path lastUploadedPath;
+		private volatile byte[] lastUploadedFileContents;
 
 		private TestUploadHandler(
 			final CompletableFuture<String> localRestAddress,
@@ -660,17 +660,26 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 
 		@Override
-		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull final HandlerRequest<FileUpload, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException {
-			lastUploadedPath = request.getRequestBody().getPath();
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException {
+			Collection<Path> uploadedFiles = request.getUploadedFiles();
+			if (uploadedFiles.size() != 1) {
+				throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
+			}
+
+			try {
+				lastUploadedFileContents = Files.readAllBytes(uploadedFiles.iterator().next());
+			} catch (IOException e) {
+				throw new RestHandlerException("Could not read contents of uploaded file.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+			}
 			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 		}
 
-		public Path getLastUploadedPath() {
-			return lastUploadedPath;
+		public byte[] getLastUploadedFileContents() {
+			return lastUploadedFileContents;
 		}
 	}
 
-	private enum TestUploadHeaders implements MessageHeaders<FileUpload, EmptyResponseBody, EmptyMessageParameters> {
+	private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 		INSTANCE;
 
 		@Override
@@ -684,8 +693,8 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 
 		@Override
-		public Class<FileUpload> getRequestClass() {
-			return FileUpload.class;
+		public Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
 		}
 
 		@Override
@@ -707,5 +716,10 @@ public class RestServerEndpointITCase extends TestLogger {
 		public String getDescription() {
 			return "";
 		}
+
+		@Override
+		public boolean acceptsFileUploads() {
+			return true;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae8cef3d/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
new file mode 100644
index 0000000..fb7faa3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+
+/**
+ * Tests for {@link FileUploads}.
+ */
+public class FileUploadsTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testRelativePathRejection() throws IOException {
+		Path relative = Paths.get("root");
+		try {
+			new FileUploads(relative);
+			Assert.fail();
+		} catch (IllegalArgumentException iae) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testDirectoryScan() throws IOException {
+		Path rootDir = Paths.get("root");
+		Path rootFile = rootDir.resolve("rootFile");
+		Path subDir = rootDir.resolve("sub");
+		Path subFile = subDir.resolve("subFile");
+
+		Path tmp = temporaryFolder.getRoot().toPath();
+		Files.createDirectory(tmp.resolve(rootDir));
+		Files.createDirectory(tmp.resolve(subDir));
+		Files.createFile(tmp.resolve(rootFile));
+		Files.createFile(tmp.resolve(subFile));
+
+		try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
+			Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+
+			Assert.assertEquals(2, detectedFiles.size());
+			Assert.assertTrue(detectedFiles.contains(tmp.resolve(rootFile)));
+			Assert.assertTrue(detectedFiles.contains(tmp.resolve(subFile)));
+		}
+	}
+
+	@Test
+	public void testEmptyDirectory() throws IOException {
+		Path rootDir = Paths.get("root");
+
+		Path tmp = temporaryFolder.getRoot().toPath();
+		Files.createDirectory(tmp.resolve(rootDir));
+
+		try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
+			Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+			Assert.assertEquals(0, detectedFiles.size());
+		}
+	}
+
+	@Test
+	public void testCleanup() throws IOException {
+		Path rootDir = Paths.get("root");
+		Path rootFile = rootDir.resolve("rootFile");
+		Path subDir = rootDir.resolve("sub");
+		Path subFile = subDir.resolve("subFile");
+
+		Path tmp = temporaryFolder.getRoot().toPath();
+		Files.createDirectory(tmp.resolve(rootDir));
+		Files.createDirectory(tmp.resolve(subDir));
+		Files.createFile(tmp.resolve(rootFile));
+		Files.createFile(tmp.resolve(subFile));
+
+		try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
+			Assert.assertTrue(Files.exists(tmp.resolve(rootDir)));
+			Assert.assertTrue(Files.exists(tmp.resolve(subDir)));
+			Assert.assertTrue(Files.exists(tmp.resolve(rootFile)));
+			Assert.assertTrue(Files.exists(tmp.resolve(subFile)));
+		}
+		Assert.assertFalse(Files.exists(tmp.resolve(rootDir)));
+		Assert.assertFalse(Files.exists(tmp.resolve(subDir)));
+		Assert.assertFalse(Files.exists(tmp.resolve(rootFile)));
+		Assert.assertFalse(Files.exists(tmp.resolve(subFile)));
+	}
+}