You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 08:23:09 UTC

[flink] 02/02: [FLINK-10115][rest] Ignore content-length limit for FileUploads

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b01aff35ec133b028c390341ca22e21bf7e13786
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 12:37:13 2018 +0200

    [FLINK-10115][rest] Ignore content-length limit for FileUploads
---
 .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 11 +++++++++--
 .../apache/flink/runtime/rest/MultipartUploadResource.java    |  2 ++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 11ff00a..7c46af0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -140,11 +142,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				if (httpContent instanceof LastHttpContent) {
 					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length);
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
 						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
 					} else {
-						ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+						currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
+						ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
 					}
 					reset();
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5d..c350393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public class MultipartUploadResource extends ExternalResource {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.PORT, 0);
 		config.setString(RestOptions.ADDRESS, "localhost");
+		// set this to a lower value on purpose to test that files larger than the content limit are still accepted
+		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024);
 		configuredUploadDir = temporaryFolder.newFolder().toPath();
 		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());