You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 08:23:07 UTC

[flink] branch master updated (1c44ba2 -> b01aff3)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1c44ba2  [hotfix][conf] Remove references to deprecated WebOptions
     new 3bad7aa  [hotfix][rest] Extend logging
     new b01aff3  [FLINK-10115][rest] Ignore content-length limit for FileUploads

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/runtime/rest/AbstractHandler.java   |  1 +
 .../org/apache/flink/runtime/rest/FileUploadHandler.java | 16 ++++++++++++++--
 .../flink/runtime/rest/MultipartUploadResource.java      |  2 ++
 3 files changed, 17 insertions(+), 2 deletions(-)


[flink] 01/02: [hotfix][rest] Extend logging

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bad7aa394f9fa5f810e16aae0cfdd1fe1ea1553
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 12:36:28 2018 +0200

    [hotfix][rest] Extend logging
---
 .../src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java | 1 +
 .../main/java/org/apache/flink/runtime/rest/FileUploadHandler.java   | 5 +++++
 2 files changed, 6 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 0d8605a..3d1ec9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -145,6 +145,7 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 					hre);
 			}
 
+			log.trace("Starting request processing.");
 			CompletableFuture<Void> requestProcessingFuture = respondToRequest(
 				ctx,
 				httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index d628750..11ff00a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -94,6 +94,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
 				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
 					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
+						LOG.trace("Initializing multipart file upload.");
 						checkState(currentHttpPostRequestDecoder == null);
 						checkState(currentHttpRequest == null);
 						checkState(currentUploadDir == null);
@@ -107,6 +108,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 					ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
 				}
 			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
+				LOG.trace("Received http content.");
 				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
 				RestServerEndpoint.createUploadDir(uploadDir, LOG);
 
@@ -121,9 +123,11 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
 						fileUpload.renameTo(dest.toFile());
+						LOG.trace("Upload of file {} complete.", fileUpload.getFilename());
 					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
 						final Attribute request = (Attribute) data;
 						// this could also be implemented by using the first found Attribute as the payload
+						LOG.trace("Upload of attribute {} complete.", request.getName());
 						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
 							currentJsonPayload = request.get();
 						} else {
@@ -134,6 +138,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				}
 
 				if (httpContent instanceof LastHttpContent) {
+					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
 					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {


[flink] 02/02: [FLINK-10115][rest] Ignore content-length limit for FileUploads

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b01aff35ec133b028c390341ca22e21bf7e13786
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 12:37:13 2018 +0200

    [FLINK-10115][rest] Ignore content-length limit for FileUploads
---
 .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 11 +++++++++--
 .../apache/flink/runtime/rest/MultipartUploadResource.java    |  2 ++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 11ff00a..7c46af0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -140,11 +142,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				if (httpContent instanceof LastHttpContent) {
 					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length);
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
 						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
 					} else {
-						ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+						currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
+						ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
 					}
 					reset();
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5d..c350393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public class MultipartUploadResource extends ExternalResource {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.PORT, 0);
 		config.setString(RestOptions.ADDRESS, "localhost");
+		// set this to a lower value on purpose to test that files larger than the content limit are still accepted
+		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024);
 		configuredUploadDir = temporaryFolder.newFolder().toPath();
 		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());