You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/22 09:04:36 UTC

flink git commit: [FLINK-9599][rest] RestClient supports FileUploads

Repository: flink
Updated Branches:
  refs/heads/master 7d034d4ef -> 181559d5b


[FLINK-9599][rest] RestClient supports FileUploads

This closes #6189.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/181559d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/181559d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/181559d5

Branch: refs/heads/master
Commit: 181559d5bcad50f919e95c7602057b929553f76b
Parents: 7d034d4
Author: zentol <ch...@apache.org>
Authored: Tue Jun 19 09:45:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 22 11:03:57 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/FileUpload.java   |  42 ++
 .../flink/runtime/rest/FileUploadHandler.java   |   2 +-
 .../apache/flink/runtime/rest/RestClient.java   | 147 +++++-
 .../runtime/rest/FileUploadHandlerTest.java     | 396 ++--------------
 .../runtime/rest/MultipartUploadResource.java   | 457 +++++++++++++++++++
 .../runtime/rest/RestClientMultipartTest.java   | 120 +++++
 6 files changed, 795 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
new file mode 100644
index 0000000..0a6429d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.nio.file.Path;
+
+/**
+ * Client uploading a file.
+ */
+public final class FileUpload {
+	private final Path file;
+	private final String contentType;
+
+	public FileUpload(Path file, String contentType) {
+		this.file = file;
+		this.contentType = contentType;
+	}
+
+	public Path getFile() {
+		return file;
+	}
+
+	public String getContentType() {
+		return contentType;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 1a5760f..0c910d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -109,7 +109,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				final HttpContent httpContent = (HttpContent) msg;
 				currentHttpPostRequestDecoder.offer(httpContent);
 
-				while (currentHttpPostRequestDecoder.hasNext()) {
+				while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
 					final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
 					if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
 						final DiskFileUpload fileUpload = (DiskFileUpload) data;

http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a63bf5f..c700807 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -53,24 +55,34 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -88,7 +100,7 @@ public class RestClient {
 	// used to open connections to a rest server endpoint
 	private final Executor executor;
 
-	private Bootstrap bootstrap;
+	private final Bootstrap bootstrap;
 
 	public RestClient(RestClientConfiguration configuration, Executor executor) {
 		Preconditions.checkNotNull(configuration);
@@ -106,6 +118,7 @@ public class RestClient {
 				socketChannel.pipeline()
 					.addLast(new HttpClientCodec())
 					.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
+					.addLast(new ChunkedWriteHandler()) // required for multipart-requests
 					.addLast(new ClientHandler());
 			}
 		};
@@ -145,12 +158,28 @@ public class RestClient {
 		}
 	}
 
-	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders,
+			U messageParameters,
+			R request) throws IOException {
+		return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, Collections.emptyList());
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders,
+			U messageParameters,
+			R request,
+			Collection<FileUpload> fileUploads) throws IOException {
 		Preconditions.checkNotNull(targetAddress);
 		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
 		Preconditions.checkNotNull(messageHeaders);
 		Preconditions.checkNotNull(request);
 		Preconditions.checkNotNull(messageParameters);
+		Preconditions.checkNotNull(fileUploads);
 		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
 
 		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
@@ -161,13 +190,7 @@ public class RestClient {
 		objectMapper.writeValue(sw, request);
 		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
-		// create request and set headers
-		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
-		httpRequest.headers()
-			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
-			.add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE)
-			.set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort)
-			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+		Request httpRequest = createRequest(targetAddress + ':' + targetPort, targetUrl, messageHeaders.getHttpMethod().getNettyHttpMethod(), payload, fileUploads);
 
 		final JavaType responseType;
 
@@ -184,7 +207,64 @@ public class RestClient {
 		return submitRequest(targetAddress, targetPort, httpRequest, responseType);
 	}
 
-	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, JavaType responseType) {
+	private static Request createRequest(String targetAddress, String targetUrl, HttpMethod httpMethod, ByteBuf jsonPayload, Collection<FileUpload> fileUploads) throws IOException {
+		if (fileUploads.isEmpty()) {
+
+			HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload);
+
+			httpRequest.headers()
+				.set(HttpHeaders.Names.HOST, targetAddress)
+				.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
+				.add(HttpHeaders.Names.CONTENT_LENGTH, jsonPayload.capacity())
+				.add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+
+			return new SimpleRequest(httpRequest);
+		} else {
+			HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl);
+
+			httpRequest.headers()
+				.set(HttpHeaders.Names.HOST, targetAddress)
+				.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+			// takes care of splitting the request into multiple parts
+			HttpPostRequestEncoder bodyRequestEncoder;
+			try {
+				// we could use mixed attributes here but we have to ensure that the minimum size is greater than
+				// any file as the upload otherwise fails
+				DefaultHttpDataFactory httpDataFactory = new DefaultHttpDataFactory(true);
+				// the FileUploadHandler explicitly checks for multipart headers
+				bodyRequestEncoder = new HttpPostRequestEncoder(httpDataFactory, httpRequest, true);
+
+				Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+				requestAttribute.setContent(jsonPayload);
+				bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+				int fileIndex = 0;
+				for (FileUpload fileUpload : fileUploads) {
+					Path path = fileUpload.getFile();
+					if (Files.isDirectory(path)) {
+						throw new IllegalArgumentException("Upload of directories is not supported. Dir=" + path);
+					}
+					File file = path.toFile();
+					LOG.trace("Adding file {} to request.", file);
+					bodyRequestEncoder.addBodyFileUpload("file_" + fileIndex, file, fileUpload.getContentType(), false);
+					fileIndex++;
+				}
+			} catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+				throw new IOException("Could not encode request.", e);
+			}
+
+			try {
+				httpRequest = bodyRequestEncoder.finalizeRequest();
+			} catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+				throw new IOException("Could not finalize request.", e);
+			}
+
+			return new MultipartRequest(httpRequest, bodyRequestEncoder);
+		}
+	}
+
+	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
 		final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
 
 		final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
@@ -203,7 +283,11 @@ public class RestClient {
 				channel -> {
 					ClientHandler handler = channel.pipeline().get(ClientHandler.class);
 					CompletableFuture<JsonResponse> future = handler.getJsonFuture();
-					channel.writeAndFlush(httpRequest);
+					try {
+						httpRequest.writeTo(channel);
+					} catch (IOException e) {
+						return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
+					}
 					return future;
 				},
 				executor)
@@ -239,6 +323,45 @@ public class RestClient {
 		return responseFuture;
 	}
 
+	private interface Request {
+		void writeTo(Channel channel) throws IOException;
+	}
+
+	private static final class SimpleRequest implements Request {
+		private final HttpRequest httpRequest;
+
+		SimpleRequest(HttpRequest httpRequest) {
+			this.httpRequest = httpRequest;
+		}
+
+		@Override
+		public void writeTo(Channel channel) {
+			channel.writeAndFlush(httpRequest);
+		}
+	}
+
+	private static final class MultipartRequest implements Request {
+		private final HttpRequest httpRequest;
+		private final HttpPostRequestEncoder bodyRequestEncoder;
+
+		MultipartRequest(HttpRequest httpRequest, HttpPostRequestEncoder bodyRequestEncoder) {
+			this.httpRequest = httpRequest;
+			this.bodyRequestEncoder = bodyRequestEncoder;
+		}
+
+		@Override
+		public void writeTo(Channel channel) {
+			channel.writeAndFlush(httpRequest);
+			// this should never be false as we explicitly set the encoder to use multipart messages
+			if (bodyRequestEncoder.isChunked()) {
+				channel.writeAndFlush(bodyRequestEncoder);
+			}
+
+			// release data and remove temporary files if they were created
+			bodyRequestEncoder.cleanFiles();
+		}
+	}
+
 	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
 
 		private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index dd9a739..6818406 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -18,32 +18,10 @@
 
 package org.apache.flink.runtime.rest;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import okhttp3.MediaType;
@@ -51,28 +29,14 @@ import okhttp3.MultipartBody;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertArrayEquals;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -81,67 +45,14 @@ import static org.junit.Assert.assertEquals;
  */
 public class FileUploadHandlerTest extends TestLogger {
 
-	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
-	private static final Random RANDOM = new Random();
-
 	@ClassRule
-	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-
-	private static RestServerEndpoint serverEndpoint;
-	private static String serverAddress;
-
-	private static MultipartMixedHandler mixedHandler;
-	private static MultipartJsonHandler jsonHandler;
-	private static MultipartFileHandler fileHandler;
-	private static File file1;
-	private static File file2;
-
-	private static Path configuredUploadDir;
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		Configuration config = new Configuration();
-		config.setInteger(RestOptions.PORT, 0);
-		config.setString(RestOptions.ADDRESS, "localhost");
-		configuredUploadDir = TEMPORARY_FOLDER.newFolder().toPath();
-		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
-
-		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
-
-		final String restAddress = "http://localhost:1234";
-		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
-			.setRestAddress(restAddress)
-			.build();
+	public static final MultipartUploadResource MULTIPART_UPLOAD_RESOURCE = new MultipartUploadResource();
 
-		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
-			CompletableFuture.completedFuture(mockRestfulGateway);
-
-		file1 = TEMPORARY_FOLDER.newFile();
-		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
-		file2 = TEMPORARY_FOLDER.newFile();
-		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
-
-		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
-		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
-		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
-
-		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
-			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
-			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
-			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
-
-		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
-
-		serverEndpoint.start();
-		serverAddress = serverEndpoint.getRestBaseUrl();
-	}
+	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
 
-	@AfterClass
-	public static void teardown() throws Exception {
-		if (serverEndpoint != null) {
-			serverEndpoint.close();
-			serverEndpoint = null;
-		}
+	@After
+	public void reset() {
+		MULTIPART_UPLOAD_RESOURCE.resetState();
 	}
 
 	private static Request buildMalformedRequest(String headerUrl) {
@@ -154,7 +65,7 @@ public class FileUploadHandlerTest extends TestLogger {
 
 	private static Request buildMixedRequestWithUnknownAttribute(String headerUrl) throws IOException {
 		MultipartBody.Builder builder = new MultipartBody.Builder();
-		builder = addJsonPart(builder, RANDOM.nextInt(), "hello");
+		builder = addJsonPart(builder, new MultipartUploadResource.TestRequestBody(), "hello");
 		builder = addFilePart(builder);
 		return finalizeRequest(builder, headerUrl);
 	}
@@ -165,15 +76,15 @@ public class FileUploadHandlerTest extends TestLogger {
 		return finalizeRequest(builder, headerUrl);
 	}
 
-	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
+	private static Request buildJsonRequest(String headerUrl, MultipartUploadResource.TestRequestBody json) throws IOException {
 		MultipartBody.Builder builder = new MultipartBody.Builder();
-		builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+		builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
 		return finalizeRequest(builder, headerUrl);
 	}
 
-	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
+	private static Request buildMixedRequest(String headerUrl, MultipartUploadResource.TestRequestBody json) throws IOException {
 		MultipartBody.Builder builder = new MultipartBody.Builder();
-		builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+		builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
 		builder = addFilePart(builder);
 		return finalizeRequest(builder, headerUrl);
 	}
@@ -184,22 +95,22 @@ public class FileUploadHandlerTest extends TestLogger {
 			.build();
 
 		return new Request.Builder()
-			.url(serverAddress + headerUrl)
+			.url(MULTIPART_UPLOAD_RESOURCE.serverAddress + headerUrl)
 			.post(multipartBody)
 			.build();
 	}
 
 	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
-		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
-		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
+		for (File file : MULTIPART_UPLOAD_RESOURCE.getFilesToUpload()) {
+			okhttp3.RequestBody filePayload = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file);
 
-		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
-			.addFormDataPart("file2", file2.getName(), filePayload2);
-	}
+			builder = builder.addFormDataPart(file.getName(), file.getName(), filePayload);
+		}
 
-	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index, String attribute) throws IOException {
-		TestRequestBody jsonRequestBody = new TestRequestBody(index);
+		return builder;
+	}
 
+	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, MultipartUploadResource.TestRequestBody jsonRequestBody, String attribute) throws IOException {
 		StringWriter sw = new StringWriter();
 		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
 
@@ -212,7 +123,9 @@ public class FileUploadHandlerTest extends TestLogger {
 	public void testMixedMultipart() throws Exception {
 		OkHttpClient client = new OkHttpClient();
 
-		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		MultipartUploadResource.MultipartMixedHandler mixedHandler = MULTIPART_UPLOAD_RESOURCE.getMixedHandler();
+
+		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
 		try (Response response = client.newCall(jsonRequest).execute()) {
 			// explicitly rejected by the test handler implementation
 			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
@@ -224,11 +137,11 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
 
-		int mixedId = RANDOM.nextInt();
-		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
+		MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json);
 		try (Response response = client.newCall(mixedRequest).execute()) {
 			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
-			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
+			assertEquals(json, mixedHandler.lastReceivedRequest);
 		}
 	}
 
@@ -236,11 +149,13 @@ public class FileUploadHandlerTest extends TestLogger {
 	public void testJsonMultipart() throws Exception {
 		OkHttpClient client = new OkHttpClient();
 
-		int jsonId = RANDOM.nextInt();
-		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
+		MultipartUploadResource.MultipartJsonHandler jsonHandler = MULTIPART_UPLOAD_RESOURCE.getJsonHandler();
+
+		MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), json);
 		try (Response response = client.newCall(jsonRequest).execute()) {
 			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
-			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
+			assertEquals(json, jsonHandler.lastReceivedRequest);
 		}
 
 		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
@@ -249,7 +164,7 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
 
-		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
 		try (Response response = client.newCall(mixedRequest).execute()) {
 			// FileUploads are outright forbidden
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -260,7 +175,9 @@ public class FileUploadHandlerTest extends TestLogger {
 	public void testFileMultipart() throws Exception {
 		OkHttpClient client = new OkHttpClient();
 
-		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		MultipartUploadResource.MultipartFileHandler fileHandler = MULTIPART_UPLOAD_RESOURCE.getFileHandler();
+
+		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
 		try (Response response = client.newCall(jsonRequest).execute()) {
 			// JSON payload did not match expected format
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -271,7 +188,7 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
 		}
 
-		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
 		try (Response response = client.newCall(mixedRequest).execute()) {
 			// JSON payload did not match expected format
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -282,11 +199,11 @@ public class FileUploadHandlerTest extends TestLogger {
 	public void testUploadCleanupOnUnknownAttribute() throws IOException {
 		OkHttpClient client = new OkHttpClient();
 
-		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+		Request request = buildMixedRequestWithUnknownAttribute(MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders().getTargetRestEndpointURL());
 		try (Response response = client.newCall(request).execute()) {
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
-		assertUploadDirectoryIsEmpty();
+		MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
 	}
 
 	/**
@@ -296,244 +213,11 @@ public class FileUploadHandlerTest extends TestLogger {
 	public void testUploadCleanupOnFailure() throws IOException {
 		OkHttpClient client = new OkHttpClient();
 
-		Request request = buildMalformedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+		Request request = buildMalformedRequest(MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders().getTargetRestEndpointURL());
 		try (Response response = client.newCall(request).execute()) {
 			// decoding errors aren't handled separately by the FileUploadHandler
 			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
 		}
-		assertUploadDirectoryIsEmpty();
-	}
-
-	private static void assertUploadDirectoryIsEmpty() throws IOException {
-		Preconditions.checkArgument(
-			1 == Files.list(configuredUploadDir).count(),
-			"Directory structure in rest upload directory has changed. Test must be adjusted");
-		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
-		Preconditions.checkArgument(
-			actualUploadDir.isPresent(),
-			"Expected upload directory does not exist.");
-		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
-	}
-
-	private static class MultipartMixedHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
-		volatile TestRequestBody lastReceivedRequest = null;
-
-		MultipartMixedHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
-			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartMixedHeaders.INSTANCE);
-		}
-
-		@Override
-		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			MultipartFileHandler.verifyFileUpload(request.getUploadedFiles());
-			this.lastReceivedRequest = request.getRequestBody();
-			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
-		}
-
-		private static final class MultipartMixedHeaders implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
-			private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
-
-			private MultipartMixedHeaders() {
-			}
-
-			@Override
-			public Class<TestRequestBody> getRequestClass() {
-				return TestRequestBody.class;
-			}
-
-			@Override
-			public Class<EmptyResponseBody> getResponseClass() {
-				return EmptyResponseBody.class;
-			}
-
-			@Override
-			public HttpResponseStatus getResponseStatusCode() {
-				return HttpResponseStatus.OK;
-			}
-
-			@Override
-			public String getDescription() {
-				return "";
-			}
-
-			@Override
-			public EmptyMessageParameters getUnresolvedMessageParameters() {
-				return EmptyMessageParameters.getInstance();
-			}
-
-			@Override
-			public HttpMethodWrapper getHttpMethod() {
-				return HttpMethodWrapper.POST;
-			}
-
-			@Override
-			public String getTargetRestEndpointURL() {
-				return "/test/upload/mixed";
-			}
-
-			@Override
-			public boolean acceptsFileUploads() {
-				return true;
-			}
-		}
-	}
-
-	private static class MultipartJsonHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
-		volatile TestRequestBody lastReceivedRequest = null;
-
-		MultipartJsonHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
-			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartJsonHeaders.INSTANCE);
-		}
-
-		@Override
-		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			Collection<Path> uploadedFiles = request.getUploadedFiles();
-			if (!uploadedFiles.isEmpty()) {
-				throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
-			}
-			this.lastReceivedRequest = request.getRequestBody();
-			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
-		}
-
-		private static final class MultipartJsonHeaders extends TestHeadersBase<TestRequestBody> {
-			private static final MultipartJsonHeaders INSTANCE = new MultipartJsonHeaders();
-
-			private MultipartJsonHeaders() {
-			}
-
-			@Override
-			public Class<TestRequestBody> getRequestClass() {
-				return TestRequestBody.class;
-			}
-
-			@Override
-			public String getTargetRestEndpointURL() {
-				return "/test/upload/json";
-			}
-
-			@Override
-			public boolean acceptsFileUploads() {
-				return false;
-			}
-		}
-	}
-
-	private static class MultipartFileHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
-
-		MultipartFileHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
-			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartFileHeaders.INSTANCE);
-		}
-
-		@Override
-		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			verifyFileUpload(request.getUploadedFiles());
-			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
-		}
-
-		static void verifyFileUpload(Collection<Path> uploadedFiles) throws RestHandlerException {
-			try {
-				assertEquals(2, uploadedFiles.size());
-
-				for (Path uploadedFile : uploadedFiles) {
-					File matchingFile;
-					if (uploadedFile.getFileName().toString().equals(file1.getName())) {
-						matchingFile = file1;
-					} else if (uploadedFile.getFileName().toString().equals(file2.getName())) {
-						matchingFile = file2;
-					} else {
-						throw new RestHandlerException("Received file with unknown name " + uploadedFile.getFileName() + '.', HttpResponseStatus.INTERNAL_SERVER_ERROR);
-					}
-
-					byte[] originalContent = Files.readAllBytes(matchingFile.toPath());
-					byte[] receivedContent = Files.readAllBytes(uploadedFile);
-					assertArrayEquals(originalContent, receivedContent);
-				}
-			} catch (Exception e) {
-				// return 505 to differentiate from common BAD_REQUEST responses in this test
-				throw new RestHandlerException("Test verification failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
-			}
-		}
-
-		private static final class MultipartFileHeaders extends TestHeadersBase<EmptyRequestBody> {
-			private static final MultipartFileHeaders INSTANCE = new MultipartFileHeaders();
-
-			private MultipartFileHeaders() {
-			}
-
-			@Override
-			public Class<EmptyRequestBody> getRequestClass() {
-				return EmptyRequestBody.class;
-			}
-
-			@Override
-			public String getTargetRestEndpointURL() {
-				return "/test/upload/file";
-			}
-
-			@Override
-			public boolean acceptsFileUploads() {
-				return true;
-			}
-		}
-	}
-
-	private abstract static class TestHeadersBase<R extends RequestBody> implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
-
-		@Override
-		public Class<EmptyResponseBody> getResponseClass() {
-			return EmptyResponseBody.class;
-		}
-
-		@Override
-		public HttpResponseStatus getResponseStatusCode() {
-			return HttpResponseStatus.OK;
-		}
-
-		@Override
-		public String getDescription() {
-			return "";
-		}
-
-		@Override
-		public EmptyMessageParameters getUnresolvedMessageParameters() {
-			return EmptyMessageParameters.getInstance();
-		}
-
-		@Override
-		public HttpMethodWrapper getHttpMethod() {
-			return HttpMethodWrapper.POST;
-		}
-	}
-
-	private static final class TestRequestBody implements RequestBody {
-		private static final String FIELD_NAME_INDEX = "index";
-
-		@JsonProperty(FIELD_NAME_INDEX)
-		private final int index;
-
-		@JsonCreator
-		TestRequestBody(@JsonProperty(FIELD_NAME_INDEX) int index) {
-			this.index = index;
-		}
-	}
-
-	private static class TestRestServerEndpoint extends RestServerEndpoint {
-
-		private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
-
-		TestRestServerEndpoint(
-			RestServerEndpointConfiguration configuration,
-			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
-			super(configuration);
-			this.handlers = Preconditions.checkNotNull(handlers);
-		}
-
-		@Override
-		protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-			return handlers;
-		}
-
-		@Override
-		protected void startInternal() {
-		}
+		MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
new file mode 100644
index 0000000..c03b85d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test base for verifying support of multipart uploads via REST.
+ */
+public class MultipartUploadResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MultipartUploadResource.class);
+
+	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private RestServerEndpoint serverEndpoint;
+	protected String serverAddress;
+	protected InetSocketAddress serverSocketAddress;
+
+	protected MultipartMixedHandler mixedHandler;
+	protected MultipartJsonHandler jsonHandler;
+	protected MultipartFileHandler fileHandler;
+	protected File file1;
+	protected File file2;
+
+	private Path configuredUploadDir;
+
+	@Override
+	public void before() throws Exception {
+		temporaryFolder.create();
+		Configuration config = new Configuration();
+		config.setInteger(RestOptions.PORT, 0);
+		config.setString(RestOptions.ADDRESS, "localhost");
+		configuredUploadDir = temporaryFolder.newFolder().toPath();
+		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
+
+		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		final String restAddress = "http://localhost:1234";
+		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+		when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+
+		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
+			CompletableFuture.completedFuture(mockRestfulGateway);
+
+		file1 = temporaryFolder.newFile();
+		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		file2 = temporaryFolder.newFile();
+		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, Arrays.asList(file1.toPath(), file2.toPath()));
+		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, Arrays.asList(file1.toPath(), file2.toPath()));
+
+		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
+			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
+			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
+			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
+
+		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
+
+		serverEndpoint.start();
+		serverAddress = serverEndpoint.getRestBaseUrl();
+		serverSocketAddress = serverEndpoint.getServerAddress();
+	}
+
+	public Collection<File> getFilesToUpload() {
+		return Arrays.asList(file1, file2);
+	}
+
+	public String getServerAddress() {
+		return serverAddress;
+	}
+
+	public InetSocketAddress getServerSocketAddress() {
+		return serverSocketAddress;
+	}
+
+	public MultipartMixedHandler getMixedHandler() {
+		return mixedHandler;
+	}
+
+	public MultipartFileHandler getFileHandler() {
+		return fileHandler;
+	}
+
+	public MultipartJsonHandler getJsonHandler() {
+		return jsonHandler;
+	}
+
+	public void resetState() {
+		mixedHandler.lastReceivedRequest = null;
+		jsonHandler.lastReceivedRequest = null;
+	}
+
+	@Override
+	public void after() {
+		temporaryFolder.delete();
+		if (serverEndpoint != null) {
+			try {
+				serverEndpoint.close();
+			} catch (Exception e) {
+				LOG.warn("Could not properly shutdown RestServerEndpoint.", e);
+			}
+			serverEndpoint = null;
+		}
+	}
+
+	public void assertUploadDirectoryIsEmpty() throws IOException {
+		Preconditions.checkArgument(
+			1 == Files.list(configuredUploadDir).count(),
+			"Directory structure in rest upload directory has changed. Test must be adjusted");
+		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
+		Preconditions.checkArgument(
+			actualUploadDir.isPresent(),
+			"Expected upload directory does not exist.");
+		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
+	}
+
+	/**
+	 * Handler that accepts a mixed request consisting of a {@link TestRequestBody} and {@link #file1} and {@link #file2}.
+	 */
+	public static class MultipartMixedHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		private final Collection<Path> expectedFiles;
+		volatile TestRequestBody lastReceivedRequest = null;
+
+		MultipartMixedHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever, Collection<Path> expectedFiles) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartMixedHeaders.INSTANCE);
+			this.expectedFiles = expectedFiles;
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles());
+			this.lastReceivedRequest = request.getRequestBody();
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		private static final class MultipartMixedHeaders implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+			private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
+
+			private MultipartMixedHeaders() {
+			}
+
+			@Override
+			public Class<TestRequestBody> getRequestClass() {
+				return TestRequestBody.class;
+			}
+
+			@Override
+			public Class<EmptyResponseBody> getResponseClass() {
+				return EmptyResponseBody.class;
+			}
+
+			@Override
+			public HttpResponseStatus getResponseStatusCode() {
+				return HttpResponseStatus.OK;
+			}
+
+			@Override
+			public String getDescription() {
+				return "";
+			}
+
+			@Override
+			public EmptyMessageParameters getUnresolvedMessageParameters() {
+				return EmptyMessageParameters.getInstance();
+			}
+
+			@Override
+			public HttpMethodWrapper getHttpMethod() {
+				return HttpMethodWrapper.POST;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/mixed";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
+		}
+	}
+
+	/**
+	 * Handler that accepts a json request consisting of a {@link TestRequestBody}.
+	 */
+	public static class MultipartJsonHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		volatile TestRequestBody lastReceivedRequest = null;
+
+		MultipartJsonHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartJsonHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			Collection<Path> uploadedFiles = request.getUploadedFiles();
+			if (!uploadedFiles.isEmpty()) {
+				throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+			}
+			this.lastReceivedRequest = request.getRequestBody();
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		private static final class MultipartJsonHeaders extends TestHeadersBase<TestRequestBody> {
+			private static final MultipartJsonHeaders INSTANCE = new MultipartJsonHeaders();
+
+			private MultipartJsonHeaders() {
+			}
+
+			@Override
+			public Class<TestRequestBody> getRequestClass() {
+				return TestRequestBody.class;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/json";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return false;
+			}
+		}
+	}
+
+	/**
+	 * Handler that accepts a file request consisting of and {@link #file1} and {@link #file2}.
+	 */
+	public static class MultipartFileHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private final Collection<Path> expectedFiles;
+
+		MultipartFileHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever, Collection<Path> expectedFiles) {
+			super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartFileHeaders.INSTANCE);
+			this.expectedFiles = expectedFiles;
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			verifyFileUpload(expectedFiles, request.getUploadedFiles());
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+
+		static void verifyFileUpload(Collection<Path> expectedFiles, Collection<Path> uploadedFiles) throws RestHandlerException {
+			try {
+				assertEquals(expectedFiles.size(), uploadedFiles.size());
+
+				List<Path> expectedList = new ArrayList<>(expectedFiles);
+				List<Path> actualList = new ArrayList<>(uploadedFiles);
+				expectedList.sort(Comparator.comparing(Path::toString));
+				actualList.sort(Comparator.comparing(Path::toString));
+
+				for (int x = 0; x < expectedList.size(); x++) {
+					Path expected = expectedList.get(x);
+					Path actual = actualList.get(x);
+
+					assertEquals(expected.getFileName().toString(), actual.getFileName().toString());
+
+					byte[] originalContent = Files.readAllBytes(expected);
+					byte[] receivedContent = Files.readAllBytes(actual);
+					assertArrayEquals(originalContent, receivedContent);
+				}
+			} catch (Exception e) {
+				// return 505 to differentiate from common BAD_REQUEST responses in this test
+				throw new RestHandlerException("Test verification failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+			}
+		}
+
+		private static final class MultipartFileHeaders extends TestHeadersBase<EmptyRequestBody> {
+			private static final MultipartFileHeaders INSTANCE = new MultipartFileHeaders();
+
+			private MultipartFileHeaders() {
+			}
+
+			@Override
+			public Class<EmptyRequestBody> getRequestClass() {
+				return EmptyRequestBody.class;
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/test/upload/file";
+			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
+		}
+	}
+
+	private abstract static class TestHeadersBase<R extends RequestBody> implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
+
+		@Override
+		public Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public String getDescription() {
+			return "";
+		}
+
+		@Override
+		public EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.POST;
+		}
+	}
+
+	/**
+	 * Simple test {@link RequestBody}.
+	 */
+	protected static final class TestRequestBody implements RequestBody {
+		private static final String FIELD_NAME_INDEX = "index";
+		private static final Random RANDOM = new Random();
+
+		@JsonProperty(FIELD_NAME_INDEX)
+		private final int index;
+
+		TestRequestBody() {
+			this(RANDOM.nextInt());
+		}
+
+		@JsonCreator
+		TestRequestBody(@JsonProperty(FIELD_NAME_INDEX) int index) {
+			this.index = index;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			TestRequestBody that = (TestRequestBody) o;
+			return index == that.index;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(index);
+		}
+
+		@Override
+		public String toString() {
+			return "TestRequestBody{" +
+				"index=" + index +
+				'}';
+		}
+	}
+
+	private static class TestRestServerEndpoint extends RestServerEndpoint {
+
+		private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
+
+		TestRestServerEndpoint(
+			RestServerEndpointConfiguration configuration,
+			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
+			super(configuration);
+			this.handlers = requireNonNull(handlers);
+		}
+
+		@Override
+		protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+			return handlers;
+		}
+
+		@Override
+		protected void startInternal() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
new file mode 100644
index 0000000..fb71e9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the multipart functionality of the {@link RestClient}.
+ */
+public class RestClientMultipartTest extends TestLogger {
+
+	@ClassRule
+	public static final MultipartUploadResource MULTIPART_UPLOAD_RESOURCE = new MultipartUploadResource();
+
+	private static RestClient restClient;
+
+	@BeforeClass
+	public static void setupClient() throws ConfigurationException {
+		restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor());
+	}
+
+	@After
+	public void reset() {
+		MULTIPART_UPLOAD_RESOURCE.resetState();
+	}
+
+	@AfterClass
+	public static void teardownClient() {
+		if (restClient != null) {
+			restClient.shutdown(Time.seconds(10));
+		}
+	}
+
+	@Test
+	public void testMixedMultipart() throws Exception {
+		Collection<FileUpload> files = MULTIPART_UPLOAD_RESOURCE.getFilesToUpload().stream()
+			.map(file -> new FileUpload(file.toPath(), "application/octet-stream")).collect(Collectors.toList());
+
+		MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+			MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders(),
+			EmptyMessageParameters.getInstance(),
+			json,
+			files
+		);
+
+		responseFuture.get();
+		Assert.assertEquals(json, MULTIPART_UPLOAD_RESOURCE.getMixedHandler().lastReceivedRequest);
+	}
+
+	@Test
+	public void testJsonMultipart() throws Exception {
+		MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+			MULTIPART_UPLOAD_RESOURCE.getJsonHandler().getMessageHeaders(),
+			EmptyMessageParameters.getInstance(),
+			json,
+			Collections.emptyList()
+		);
+
+		responseFuture.get();
+		Assert.assertEquals(json, MULTIPART_UPLOAD_RESOURCE.getJsonHandler().lastReceivedRequest);
+	}
+
+	@Test
+	public void testFileMultipart() throws Exception {
+		Collection<FileUpload> files = MULTIPART_UPLOAD_RESOURCE.getFilesToUpload().stream()
+			.map(file -> new FileUpload(file.toPath(), "application/octet-stream")).collect(Collectors.toList());
+
+		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+			MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+			MULTIPART_UPLOAD_RESOURCE.getFileHandler().getMessageHeaders(),
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			files
+		);
+
+		responseFuture.get();
+	}
+}