You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 11:39:27 UTC

[flink] branch release-1.6 updated (4cf197e -> 994b185)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4cf197e  [FLINK-10142][network] reduce locking around credit notification
     new 70c9b7c  [hotfix][rest] Extend logging
     new 994b185  [FLINK-10115][rest] Ignore content-length limit for FileUploads

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/runtime/rest/AbstractHandler.java   |  1 +
 .../org/apache/flink/runtime/rest/FileUploadHandler.java | 16 ++++++++++++++--
 .../flink/runtime/rest/MultipartUploadResource.java      |  2 ++
 3 files changed, 17 insertions(+), 2 deletions(-)


[flink] 01/02: [hotfix][rest] Extend logging

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70c9b7cf886643c268d382e5f25b8d45f0c384cf
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 12:36:28 2018 +0200

    [hotfix][rest] Extend logging
---
 .../src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java | 1 +
 .../main/java/org/apache/flink/runtime/rest/FileUploadHandler.java   | 5 +++++
 2 files changed, 6 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 0d8605a..3d1ec9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -145,6 +145,7 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 					hre);
 			}
 
+			log.trace("Starting request processing.");
 			CompletableFuture<Void> requestProcessingFuture = respondToRequest(
 				ctx,
 				httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index d628750..11ff00a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -94,6 +94,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
 				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
 					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
+						LOG.trace("Initializing multipart file upload.");
 						checkState(currentHttpPostRequestDecoder == null);
 						checkState(currentHttpRequest == null);
 						checkState(currentUploadDir == null);
@@ -107,6 +108,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 					ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
 				}
 			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
+				LOG.trace("Received http content.");
 				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
 				RestServerEndpoint.createUploadDir(uploadDir, LOG);
 
@@ -121,9 +123,11 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
 						fileUpload.renameTo(dest.toFile());
+						LOG.trace("Upload of file {} complete.", fileUpload.getFilename());
 					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
 						final Attribute request = (Attribute) data;
 						// this could also be implemented by using the first found Attribute as the payload
+						LOG.trace("Upload of attribute {} complete.", request.getName());
 						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
 							currentJsonPayload = request.get();
 						} else {
@@ -134,6 +138,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				}
 
 				if (httpContent instanceof LastHttpContent) {
+					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
 					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {


[flink] 02/02: [FLINK-10115][rest] Ignore content-length limit for FileUploads

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 994b18518fb1534f6a62491e5149770b8667f68e
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 12:37:13 2018 +0200

    [FLINK-10115][rest] Ignore content-length limit for FileUploads
---
 .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 11 +++++++++--
 .../apache/flink/runtime/rest/MultipartUploadResource.java    |  2 ++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 11ff00a..7c46af0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -140,11 +142,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 				if (httpContent instanceof LastHttpContent) {
 					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length);
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
 						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
 					} else {
-						ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+						currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
+						ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
 					}
 					reset();
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5d..c350393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public class MultipartUploadResource extends ExternalResource {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.PORT, 0);
 		config.setString(RestOptions.ADDRESS, "localhost");
+		// set this to a lower value on purpose to test that files larger than the content limit are still accepted
+		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024);
 		configuredUploadDir = temporaryFolder.newFolder().toPath();
 		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());