You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/22 09:04:36 UTC
flink git commit: [FLINK-9599][rest] RestClient supports FileUploads
Repository: flink
Updated Branches:
refs/heads/master 7d034d4ef -> 181559d5b
[FLINK-9599][rest] RestClient supports FileUploads
This closes #6189.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/181559d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/181559d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/181559d5
Branch: refs/heads/master
Commit: 181559d5bcad50f919e95c7602057b929553f76b
Parents: 7d034d4
Author: zentol <ch...@apache.org>
Authored: Tue Jun 19 09:45:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 22 11:03:57 2018 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rest/FileUpload.java | 42 ++
.../flink/runtime/rest/FileUploadHandler.java | 2 +-
.../apache/flink/runtime/rest/RestClient.java | 147 +++++-
.../runtime/rest/FileUploadHandlerTest.java | 396 ++--------------
.../runtime/rest/MultipartUploadResource.java | 457 +++++++++++++++++++
.../runtime/rest/RestClientMultipartTest.java | 120 +++++
6 files changed, 795 insertions(+), 369 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
new file mode 100644
index 0000000..0a6429d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUpload.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.nio.file.Path;
+
+/**
+ * Client uploading a file.
+ */
+public final class FileUpload {
+ private final Path file;
+ private final String contentType;
+
+ public FileUpload(Path file, String contentType) {
+ this.file = file;
+ this.contentType = contentType;
+ }
+
+ public Path getFile() {
+ return file;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 1a5760f..0c910d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -109,7 +109,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);
- while (currentHttpPostRequestDecoder.hasNext()) {
+ while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
final DiskFileUpload fileUpload = (DiskFileUpload) data;
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a63bf5f..c700807 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -53,24 +55,34 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -88,7 +100,7 @@ public class RestClient {
// used to open connections to a rest server endpoint
private final Executor executor;
- private Bootstrap bootstrap;
+ private final Bootstrap bootstrap;
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
@@ -106,6 +118,7 @@ public class RestClient {
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
+ .addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new ClientHandler());
}
};
@@ -145,12 +158,28 @@ public class RestClient {
}
}
- public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request) throws IOException {
+ return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, Collections.emptyList());
+ }
+
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ Collection<FileUpload> fileUploads) throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
Preconditions.checkNotNull(messageHeaders);
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkNotNull(fileUploads);
Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
@@ -161,13 +190,7 @@ public class RestClient {
objectMapper.writeValue(sw, request);
ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
- // create request and set headers
- FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
- httpRequest.headers()
- .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
- .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE)
- .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort)
- .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ Request httpRequest = createRequest(targetAddress + ':' + targetPort, targetUrl, messageHeaders.getHttpMethod().getNettyHttpMethod(), payload, fileUploads);
final JavaType responseType;
@@ -184,7 +207,64 @@ public class RestClient {
return submitRequest(targetAddress, targetPort, httpRequest, responseType);
}
- private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, JavaType responseType) {
+ private static Request createRequest(String targetAddress, String targetUrl, HttpMethod httpMethod, ByteBuf jsonPayload, Collection<FileUpload> fileUploads) throws IOException {
+ if (fileUploads.isEmpty()) {
+
+ HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload);
+
+ httpRequest.headers()
+ .set(HttpHeaders.Names.HOST, targetAddress)
+ .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
+ .add(HttpHeaders.Names.CONTENT_LENGTH, jsonPayload.capacity())
+ .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+
+ return new SimpleRequest(httpRequest);
+ } else {
+ HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl);
+
+ httpRequest.headers()
+ .set(HttpHeaders.Names.HOST, targetAddress)
+ .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ // takes care of splitting the request into multiple parts
+ HttpPostRequestEncoder bodyRequestEncoder;
+ try {
+ // we could use mixed attributes here but we have to ensure that the minimum size is greater than
+ // any file as the upload otherwise fails
+ DefaultHttpDataFactory httpDataFactory = new DefaultHttpDataFactory(true);
+ // the FileUploadHandler explicitly checks for multipart headers
+ bodyRequestEncoder = new HttpPostRequestEncoder(httpDataFactory, httpRequest, true);
+
+ Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ requestAttribute.setContent(jsonPayload);
+ bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+ int fileIndex = 0;
+ for (FileUpload fileUpload : fileUploads) {
+ Path path = fileUpload.getFile();
+ if (Files.isDirectory(path)) {
+ throw new IllegalArgumentException("Upload of directories is not supported. Dir=" + path);
+ }
+ File file = path.toFile();
+ LOG.trace("Adding file {} to request.", file);
+ bodyRequestEncoder.addBodyFileUpload("file_" + fileIndex, file, fileUpload.getContentType(), false);
+ fileIndex++;
+ }
+ } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+ throw new IOException("Could not encode request.", e);
+ }
+
+ try {
+ httpRequest = bodyRequestEncoder.finalizeRequest();
+ } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+ throw new IOException("Could not finalize request.", e);
+ }
+
+ return new MultipartRequest(httpRequest, bodyRequestEncoder);
+ }
+ }
+
+ private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
@@ -203,7 +283,11 @@ public class RestClient {
channel -> {
ClientHandler handler = channel.pipeline().get(ClientHandler.class);
CompletableFuture<JsonResponse> future = handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
+ try {
+ httpRequest.writeTo(channel);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
+ }
return future;
},
executor)
@@ -239,6 +323,45 @@ public class RestClient {
return responseFuture;
}
+ private interface Request {
+ void writeTo(Channel channel) throws IOException;
+ }
+
+ private static final class SimpleRequest implements Request {
+ private final HttpRequest httpRequest;
+
+ SimpleRequest(HttpRequest httpRequest) {
+ this.httpRequest = httpRequest;
+ }
+
+ @Override
+ public void writeTo(Channel channel) {
+ channel.writeAndFlush(httpRequest);
+ }
+ }
+
+ private static final class MultipartRequest implements Request {
+ private final HttpRequest httpRequest;
+ private final HttpPostRequestEncoder bodyRequestEncoder;
+
+ MultipartRequest(HttpRequest httpRequest, HttpPostRequestEncoder bodyRequestEncoder) {
+ this.httpRequest = httpRequest;
+ this.bodyRequestEncoder = bodyRequestEncoder;
+ }
+
+ @Override
+ public void writeTo(Channel channel) {
+ channel.writeAndFlush(httpRequest);
+ // this should never be false as we explicitly set the encoder to use multipart messages
+ if (bodyRequestEncoder.isChunked()) {
+ channel.writeAndFlush(bodyRequestEncoder);
+ }
+
+ // release data and remove temporary files if they were created
+ bodyRequestEncoder.cleanFiles();
+ }
+ }
+
private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index dd9a739..6818406 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -18,32 +18,10 @@
package org.apache.flink.runtime.rest;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import okhttp3.MediaType;
@@ -51,28 +29,14 @@ import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertArrayEquals;
+
import static org.junit.Assert.assertEquals;
/**
@@ -81,67 +45,14 @@ import static org.junit.Assert.assertEquals;
*/
public class FileUploadHandlerTest extends TestLogger {
- private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
- private static final Random RANDOM = new Random();
-
@ClassRule
- public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-
- private static RestServerEndpoint serverEndpoint;
- private static String serverAddress;
-
- private static MultipartMixedHandler mixedHandler;
- private static MultipartJsonHandler jsonHandler;
- private static MultipartFileHandler fileHandler;
- private static File file1;
- private static File file2;
-
- private static Path configuredUploadDir;
-
- @BeforeClass
- public static void setup() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(RestOptions.PORT, 0);
- config.setString(RestOptions.ADDRESS, "localhost");
- configuredUploadDir = TEMPORARY_FOLDER.newFolder().toPath();
- config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
-
- RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
-
- final String restAddress = "http://localhost:1234";
- RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
- .setRestAddress(restAddress)
- .build();
+ public static final MultipartUploadResource MULTIPART_UPLOAD_RESOURCE = new MultipartUploadResource();
- final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
- CompletableFuture.completedFuture(mockRestfulGateway);
-
- file1 = TEMPORARY_FOLDER.newFile();
- Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
- file2 = TEMPORARY_FOLDER.newFile();
- Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
-
- mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
- jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
- fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
-
- final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
- Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
- Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
- Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
-
- serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
-
- serverEndpoint.start();
- serverAddress = serverEndpoint.getRestBaseUrl();
- }
+ private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
- @AfterClass
- public static void teardown() throws Exception {
- if (serverEndpoint != null) {
- serverEndpoint.close();
- serverEndpoint = null;
- }
+ @After
+ public void reset() {
+ MULTIPART_UPLOAD_RESOURCE.resetState();
}
private static Request buildMalformedRequest(String headerUrl) {
@@ -154,7 +65,7 @@ public class FileUploadHandlerTest extends TestLogger {
private static Request buildMixedRequestWithUnknownAttribute(String headerUrl) throws IOException {
MultipartBody.Builder builder = new MultipartBody.Builder();
- builder = addJsonPart(builder, RANDOM.nextInt(), "hello");
+ builder = addJsonPart(builder, new MultipartUploadResource.TestRequestBody(), "hello");
builder = addFilePart(builder);
return finalizeRequest(builder, headerUrl);
}
@@ -165,15 +76,15 @@ public class FileUploadHandlerTest extends TestLogger {
return finalizeRequest(builder, headerUrl);
}
- private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
+ private static Request buildJsonRequest(String headerUrl, MultipartUploadResource.TestRequestBody json) throws IOException {
MultipartBody.Builder builder = new MultipartBody.Builder();
- builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
return finalizeRequest(builder, headerUrl);
}
- private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
+ private static Request buildMixedRequest(String headerUrl, MultipartUploadResource.TestRequestBody json) throws IOException {
MultipartBody.Builder builder = new MultipartBody.Builder();
- builder = addJsonPart(builder, index, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
builder = addFilePart(builder);
return finalizeRequest(builder, headerUrl);
}
@@ -184,22 +95,22 @@ public class FileUploadHandlerTest extends TestLogger {
.build();
return new Request.Builder()
- .url(serverAddress + headerUrl)
+ .url(MULTIPART_UPLOAD_RESOURCE.serverAddress + headerUrl)
.post(multipartBody)
.build();
}
private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
- okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
- okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
+ for (File file : MULTIPART_UPLOAD_RESOURCE.getFilesToUpload()) {
+ okhttp3.RequestBody filePayload = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file);
- return builder.addFormDataPart("file1", file1.getName(), filePayload1)
- .addFormDataPart("file2", file2.getName(), filePayload2);
- }
+ builder = builder.addFormDataPart(file.getName(), file.getName(), filePayload);
+ }
- private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index, String attribute) throws IOException {
- TestRequestBody jsonRequestBody = new TestRequestBody(index);
+ return builder;
+ }
+ private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, MultipartUploadResource.TestRequestBody jsonRequestBody, String attribute) throws IOException {
StringWriter sw = new StringWriter();
OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
@@ -212,7 +123,9 @@ public class FileUploadHandlerTest extends TestLogger {
public void testMixedMultipart() throws Exception {
OkHttpClient client = new OkHttpClient();
- Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+ MultipartUploadResource.MultipartMixedHandler mixedHandler = MULTIPART_UPLOAD_RESOURCE.getMixedHandler();
+
+ Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
try (Response response = client.newCall(jsonRequest).execute()) {
// explicitly rejected by the test handler implementation
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
@@ -224,11 +137,11 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
- int mixedId = RANDOM.nextInt();
- Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
+ MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+ Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json);
try (Response response = client.newCall(mixedRequest).execute()) {
assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
- assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
+ assertEquals(json, mixedHandler.lastReceivedRequest);
}
}
@@ -236,11 +149,13 @@ public class FileUploadHandlerTest extends TestLogger {
public void testJsonMultipart() throws Exception {
OkHttpClient client = new OkHttpClient();
- int jsonId = RANDOM.nextInt();
- Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
+ MultipartUploadResource.MultipartJsonHandler jsonHandler = MULTIPART_UPLOAD_RESOURCE.getJsonHandler();
+
+ MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+ Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), json);
try (Response response = client.newCall(jsonRequest).execute()) {
assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
- assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
+ assertEquals(json, jsonHandler.lastReceivedRequest);
}
Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
@@ -249,7 +164,7 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
- Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+ Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
try (Response response = client.newCall(mixedRequest).execute()) {
// FileUploads are outright forbidden
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -260,7 +175,9 @@ public class FileUploadHandlerTest extends TestLogger {
public void testFileMultipart() throws Exception {
OkHttpClient client = new OkHttpClient();
- Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+ MultipartUploadResource.MultipartFileHandler fileHandler = MULTIPART_UPLOAD_RESOURCE.getFileHandler();
+
+ Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
try (Response response = client.newCall(jsonRequest).execute()) {
// JSON payload did not match expected format
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -271,7 +188,7 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
}
- Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
+ Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), new MultipartUploadResource.TestRequestBody());
try (Response response = client.newCall(mixedRequest).execute()) {
// JSON payload did not match expected format
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
@@ -282,11 +199,11 @@ public class FileUploadHandlerTest extends TestLogger {
public void testUploadCleanupOnUnknownAttribute() throws IOException {
OkHttpClient client = new OkHttpClient();
- Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+ Request request = buildMixedRequestWithUnknownAttribute(MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders().getTargetRestEndpointURL());
try (Response response = client.newCall(request).execute()) {
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
- assertUploadDirectoryIsEmpty();
+ MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
}
/**
@@ -296,244 +213,11 @@ public class FileUploadHandlerTest extends TestLogger {
public void testUploadCleanupOnFailure() throws IOException {
OkHttpClient client = new OkHttpClient();
- Request request = buildMalformedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+ Request request = buildMalformedRequest(MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders().getTargetRestEndpointURL());
try (Response response = client.newCall(request).execute()) {
// decoding errors aren't handled separately by the FileUploadHandler
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
}
- assertUploadDirectoryIsEmpty();
- }
-
- private static void assertUploadDirectoryIsEmpty() throws IOException {
- Preconditions.checkArgument(
- 1 == Files.list(configuredUploadDir).count(),
- "Directory structure in rest upload directory has changed. Test must be adjusted");
- Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
- Preconditions.checkArgument(
- actualUploadDir.isPresent(),
- "Expected upload directory does not exist.");
- assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
- }
-
- private static class MultipartMixedHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
- volatile TestRequestBody lastReceivedRequest = null;
-
- MultipartMixedHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
- super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartMixedHeaders.INSTANCE);
- }
-
- @Override
- protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- MultipartFileHandler.verifyFileUpload(request.getUploadedFiles());
- this.lastReceivedRequest = request.getRequestBody();
- return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
- }
-
- private static final class MultipartMixedHeaders implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
- private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
-
- private MultipartMixedHeaders() {
- }
-
- @Override
- public Class<TestRequestBody> getRequestClass() {
- return TestRequestBody.class;
- }
-
- @Override
- public Class<EmptyResponseBody> getResponseClass() {
- return EmptyResponseBody.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public String getDescription() {
- return "";
- }
-
- @Override
- public EmptyMessageParameters getUnresolvedMessageParameters() {
- return EmptyMessageParameters.getInstance();
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.POST;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return "/test/upload/mixed";
- }
-
- @Override
- public boolean acceptsFileUploads() {
- return true;
- }
- }
- }
-
- private static class MultipartJsonHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
- volatile TestRequestBody lastReceivedRequest = null;
-
- MultipartJsonHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
- super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartJsonHeaders.INSTANCE);
- }
-
- @Override
- protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
- if (!uploadedFiles.isEmpty()) {
- throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
- }
- this.lastReceivedRequest = request.getRequestBody();
- return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
- }
-
- private static final class MultipartJsonHeaders extends TestHeadersBase<TestRequestBody> {
- private static final MultipartJsonHeaders INSTANCE = new MultipartJsonHeaders();
-
- private MultipartJsonHeaders() {
- }
-
- @Override
- public Class<TestRequestBody> getRequestClass() {
- return TestRequestBody.class;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return "/test/upload/json";
- }
-
- @Override
- public boolean acceptsFileUploads() {
- return false;
- }
- }
- }
-
- private static class MultipartFileHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
-
- MultipartFileHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
- super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartFileHeaders.INSTANCE);
- }
-
- @Override
- protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- verifyFileUpload(request.getUploadedFiles());
- return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
- }
-
- static void verifyFileUpload(Collection<Path> uploadedFiles) throws RestHandlerException {
- try {
- assertEquals(2, uploadedFiles.size());
-
- for (Path uploadedFile : uploadedFiles) {
- File matchingFile;
- if (uploadedFile.getFileName().toString().equals(file1.getName())) {
- matchingFile = file1;
- } else if (uploadedFile.getFileName().toString().equals(file2.getName())) {
- matchingFile = file2;
- } else {
- throw new RestHandlerException("Received file with unknown name " + uploadedFile.getFileName() + '.', HttpResponseStatus.INTERNAL_SERVER_ERROR);
- }
-
- byte[] originalContent = Files.readAllBytes(matchingFile.toPath());
- byte[] receivedContent = Files.readAllBytes(uploadedFile);
- assertArrayEquals(originalContent, receivedContent);
- }
- } catch (Exception e) {
- // return 505 to differentiate from common BAD_REQUEST responses in this test
- throw new RestHandlerException("Test verification failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
- }
- }
-
- private static final class MultipartFileHeaders extends TestHeadersBase<EmptyRequestBody> {
- private static final MultipartFileHeaders INSTANCE = new MultipartFileHeaders();
-
- private MultipartFileHeaders() {
- }
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return "/test/upload/file";
- }
-
- @Override
- public boolean acceptsFileUploads() {
- return true;
- }
- }
- }
-
- private abstract static class TestHeadersBase<R extends RequestBody> implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
-
- @Override
- public Class<EmptyResponseBody> getResponseClass() {
- return EmptyResponseBody.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public String getDescription() {
- return "";
- }
-
- @Override
- public EmptyMessageParameters getUnresolvedMessageParameters() {
- return EmptyMessageParameters.getInstance();
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.POST;
- }
- }
-
- private static final class TestRequestBody implements RequestBody {
- private static final String FIELD_NAME_INDEX = "index";
-
- @JsonProperty(FIELD_NAME_INDEX)
- private final int index;
-
- @JsonCreator
- TestRequestBody(@JsonProperty(FIELD_NAME_INDEX) int index) {
- this.index = index;
- }
- }
-
- private static class TestRestServerEndpoint extends RestServerEndpoint {
-
- private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
-
- TestRestServerEndpoint(
- RestServerEndpointConfiguration configuration,
- List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
- super(configuration);
- this.handlers = Preconditions.checkNotNull(handlers);
- }
-
- @Override
- protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
- return handlers;
- }
-
- @Override
- protected void startInternal() {
- }
+ MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
new file mode 100644
index 0000000..c03b85d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test base for verifying support of multipart uploads via REST.
+ */
+public class MultipartUploadResource extends ExternalResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MultipartUploadResource.class);
+
+ private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private RestServerEndpoint serverEndpoint;
+ protected String serverAddress;
+ protected InetSocketAddress serverSocketAddress;
+
+ protected MultipartMixedHandler mixedHandler;
+ protected MultipartJsonHandler jsonHandler;
+ protected MultipartFileHandler fileHandler;
+ protected File file1;
+ protected File file2;
+
+ private Path configuredUploadDir;
+
+ @Override
+ public void before() throws Exception {
+ temporaryFolder.create();
+ Configuration config = new Configuration();
+ config.setInteger(RestOptions.PORT, 0);
+ config.setString(RestOptions.ADDRESS, "localhost");
+ configuredUploadDir = temporaryFolder.newFolder().toPath();
+ config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
+
+ RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
+
+ final String restAddress = "http://localhost:1234";
+ RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+ when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+
+ final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
+ CompletableFuture.completedFuture(mockRestfulGateway);
+
+ file1 = temporaryFolder.newFile();
+ Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ file2 = temporaryFolder.newFile();
+ Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, Arrays.asList(file1.toPath(), file2.toPath()));
+ jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+ fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, Arrays.asList(file1.toPath(), file2.toPath()));
+
+ final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
+ Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
+ Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
+ Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
+
+ serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
+
+ serverEndpoint.start();
+ serverAddress = serverEndpoint.getRestBaseUrl();
+ serverSocketAddress = serverEndpoint.getServerAddress();
+ }
+
+ public Collection<File> getFilesToUpload() {
+ return Arrays.asList(file1, file2);
+ }
+
+ public String getServerAddress() {
+ return serverAddress;
+ }
+
+ public InetSocketAddress getServerSocketAddress() {
+ return serverSocketAddress;
+ }
+
+ public MultipartMixedHandler getMixedHandler() {
+ return mixedHandler;
+ }
+
+ public MultipartFileHandler getFileHandler() {
+ return fileHandler;
+ }
+
+ public MultipartJsonHandler getJsonHandler() {
+ return jsonHandler;
+ }
+
+ public void resetState() {
+ mixedHandler.lastReceivedRequest = null;
+ jsonHandler.lastReceivedRequest = null;
+ }
+
+ @Override
+ public void after() {
+ temporaryFolder.delete();
+ if (serverEndpoint != null) {
+ try {
+ serverEndpoint.close();
+ } catch (Exception e) {
+ LOG.warn("Could not properly shutdown RestServerEndpoint.", e);
+ }
+ serverEndpoint = null;
+ }
+ }
+
+ public void assertUploadDirectoryIsEmpty() throws IOException {
+ Preconditions.checkArgument(
+ 1 == Files.list(configuredUploadDir).count(),
+ "Directory structure in rest upload directory has changed. Test must be adjusted");
+ Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
+ Preconditions.checkArgument(
+ actualUploadDir.isPresent(),
+ "Expected upload directory does not exist.");
+ assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
+ }
+
+ /**
+ * Handler that accepts a mixed request consisting of a {@link TestRequestBody} and {@link #file1} and {@link #file2}.
+ */
+ public static class MultipartMixedHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+ private final Collection<Path> expectedFiles;
+ volatile TestRequestBody lastReceivedRequest = null;
+
+ MultipartMixedHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever, Collection<Path> expectedFiles) {
+ super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartMixedHeaders.INSTANCE);
+ this.expectedFiles = expectedFiles;
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ this.lastReceivedRequest = request.getRequestBody();
+ return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+
+ private static final class MultipartMixedHeaders implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+ private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
+
+ private MultipartMixedHeaders() {
+ }
+
+ @Override
+ public Class<TestRequestBody> getRequestClass() {
+ return TestRequestBody.class;
+ }
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "";
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/upload/mixed";
+ }
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Handler that accepts a json request consisting of a {@link TestRequestBody}.
+ */
+ public static class MultipartJsonHandler extends AbstractRestHandler<RestfulGateway, TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+ volatile TestRequestBody lastReceivedRequest = null;
+
+ MultipartJsonHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever) {
+ super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartJsonHeaders.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ Collection<Path> uploadedFiles = request.getUploadedFiles();
+ if (!uploadedFiles.isEmpty()) {
+ throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ this.lastReceivedRequest = request.getRequestBody();
+ return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+
+ private static final class MultipartJsonHeaders extends TestHeadersBase<TestRequestBody> {
+ private static final MultipartJsonHeaders INSTANCE = new MultipartJsonHeaders();
+
+ private MultipartJsonHeaders() {
+ }
+
+ @Override
+ public Class<TestRequestBody> getRequestClass() {
+ return TestRequestBody.class;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/upload/json";
+ }
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Handler that accepts a file request consisting of and {@link #file1} and {@link #file2}.
+ */
+ public static class MultipartFileHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+ private final Collection<Path> expectedFiles;
+
+ MultipartFileHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<RestfulGateway> leaderRetriever, Collection<Path> expectedFiles) {
+ super(localRestAddress, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), MultipartFileHeaders.INSTANCE);
+ this.expectedFiles = expectedFiles;
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+
+ static void verifyFileUpload(Collection<Path> expectedFiles, Collection<Path> uploadedFiles) throws RestHandlerException {
+ try {
+ assertEquals(expectedFiles.size(), uploadedFiles.size());
+
+ List<Path> expectedList = new ArrayList<>(expectedFiles);
+ List<Path> actualList = new ArrayList<>(uploadedFiles);
+ expectedList.sort(Comparator.comparing(Path::toString));
+ actualList.sort(Comparator.comparing(Path::toString));
+
+ for (int x = 0; x < expectedList.size(); x++) {
+ Path expected = expectedList.get(x);
+ Path actual = actualList.get(x);
+
+ assertEquals(expected.getFileName().toString(), actual.getFileName().toString());
+
+ byte[] originalContent = Files.readAllBytes(expected);
+ byte[] receivedContent = Files.readAllBytes(actual);
+ assertArrayEquals(originalContent, receivedContent);
+ }
+ } catch (Exception e) {
+ // return 505 to differentiate from common BAD_REQUEST responses in this test
+ throw new RestHandlerException("Test verification failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ private static final class MultipartFileHeaders extends TestHeadersBase<EmptyRequestBody> {
+ private static final MultipartFileHeaders INSTANCE = new MultipartFileHeaders();
+
+ private MultipartFileHeaders() {
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/upload/file";
+ }
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
+ }
+ }
+ }
+
+ private abstract static class TestHeadersBase<R extends RequestBody> implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "";
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+ }
+
+ /**
+ * Simple test {@link RequestBody}.
+ */
+ protected static final class TestRequestBody implements RequestBody {
+ private static final String FIELD_NAME_INDEX = "index";
+ private static final Random RANDOM = new Random();
+
+ @JsonProperty(FIELD_NAME_INDEX)
+ private final int index;
+
+ TestRequestBody() {
+ this(RANDOM.nextInt());
+ }
+
+ @JsonCreator
+ TestRequestBody(@JsonProperty(FIELD_NAME_INDEX) int index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestRequestBody that = (TestRequestBody) o;
+ return index == that.index;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
+
+ @Override
+ public String toString() {
+ return "TestRequestBody{" +
+ "index=" + index +
+ '}';
+ }
+ }
+
+ private static class TestRestServerEndpoint extends RestServerEndpoint {
+
+ private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
+
+ TestRestServerEndpoint(
+ RestServerEndpointConfiguration configuration,
+ List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
+ super(configuration);
+ this.handlers = requireNonNull(handlers);
+ }
+
+ @Override
+ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+ return handlers;
+ }
+
+ @Override
+ protected void startInternal() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/181559d5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
new file mode 100644
index 0000000..fb71e9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the multipart functionality of the {@link RestClient}.
+ */
+public class RestClientMultipartTest extends TestLogger {
+
+ @ClassRule
+ public static final MultipartUploadResource MULTIPART_UPLOAD_RESOURCE = new MultipartUploadResource();
+
+ private static RestClient restClient;
+
+ @BeforeClass
+ public static void setupClient() throws ConfigurationException {
+ restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor());
+ }
+
+ @After
+ public void reset() {
+ MULTIPART_UPLOAD_RESOURCE.resetState();
+ }
+
+ @AfterClass
+ public static void teardownClient() {
+ if (restClient != null) {
+ restClient.shutdown(Time.seconds(10));
+ }
+ }
+
+ @Test
+ public void testMixedMultipart() throws Exception {
+ Collection<FileUpload> files = MULTIPART_UPLOAD_RESOURCE.getFilesToUpload().stream()
+ .map(file -> new FileUpload(file.toPath(), "application/octet-stream")).collect(Collectors.toList());
+
+ MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+ CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+ MULTIPART_UPLOAD_RESOURCE.getMixedHandler().getMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ json,
+ files
+ );
+
+ responseFuture.get();
+ Assert.assertEquals(json, MULTIPART_UPLOAD_RESOURCE.getMixedHandler().lastReceivedRequest);
+ }
+
+ @Test
+ public void testJsonMultipart() throws Exception {
+ MultipartUploadResource.TestRequestBody json = new MultipartUploadResource.TestRequestBody();
+ CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+ MULTIPART_UPLOAD_RESOURCE.getJsonHandler().getMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ json,
+ Collections.emptyList()
+ );
+
+ responseFuture.get();
+ Assert.assertEquals(json, MULTIPART_UPLOAD_RESOURCE.getJsonHandler().lastReceivedRequest);
+ }
+
+ @Test
+ public void testFileMultipart() throws Exception {
+ Collection<FileUpload> files = MULTIPART_UPLOAD_RESOURCE.getFilesToUpload().stream()
+ .map(file -> new FileUpload(file.toPath(), "application/octet-stream")).collect(Collectors.toList());
+
+ CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getHostName(),
+ MULTIPART_UPLOAD_RESOURCE.getServerSocketAddress().getPort(),
+ MULTIPART_UPLOAD_RESOURCE.getFileHandler().getMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ files
+ );
+
+ responseFuture.get();
+ }
+}