You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/02 10:30:55 UTC

flink git commit: [FLINK-9677][rest] Cleanup encoder after request has been processed

Repository: flink
Updated Branches:
  refs/heads/release-1.5 35bddc13f -> fea870b51


[FLINK-9677][rest] Cleanup encoder after request has been processed


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fea870b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fea870b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fea870b5

Branch: refs/heads/release-1.5
Commit: fea870b51a66f92ff8bc4aada6cc51bd3c172f43
Parents: 35bddc1
Author: zentol <ch...@apache.org>
Authored: Wed Jun 27 09:34:34 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 2 12:30:40 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/FileUploadHandler.java    | 12 +++++++++++-
 .../java/org/apache/flink/runtime/rest/RestClient.java  |  8 ++++----
 .../flink/runtime/rest/MultipartUploadResource.java     |  5 ++++-
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fea870b5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 6fdd350..aa87cc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -79,6 +79,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 	private HttpRequest currentHttpRequest;
 	private byte[] currentJsonPayload;
 	private Path currentUploadDir;
+	private boolean currentRequestFailed = false;
 
 	public FileUploadHandler(final Path uploadDir) {
 		super(false);
@@ -90,6 +91,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
 		try {
 			if (msg instanceof HttpRequest) {
+				currentRequestFailed = false;
 				final HttpRequest httpRequest = (HttpRequest) msg;
 				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
 				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
@@ -145,6 +147,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 					}
 					reset();
 				}
+			} else if (currentRequestFailed) {
+				LOG.trace("Swallowing content for failed request. {}", msg);
 			} else {
 				ctx.fireChannelRead(msg);
 			}
@@ -154,6 +158,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 	}
 
 	private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+		currentRequestFailed = true;
 		HttpRequest tmpRequest = currentHttpRequest;
 		deleteUploadedFiles();
 		reset();
@@ -180,7 +185,12 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 	private void reset() {
 		// destroy() can fail because some data is stored multiple times in the decoder causing an IllegalReferenceCountException
 		// see https://github.com/netty/netty/issues/7814
-		currentHttpPostRequestDecoder.getBodyHttpDatas().clear();
+		try {
+			currentHttpPostRequestDecoder.getBodyHttpDatas().clear();
+		} catch (HttpPostRequestDecoder.NotEnoughDataDecoderException ned) {
+			// this method always fails if not all chunks were offered to the decoder yet
+			LOG.debug("Error while resetting handler.", ned);
+		}
 		currentHttpPostRequestDecoder.destroy();
 		currentHttpPostRequestDecoder = null;
 		currentHttpRequest = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/fea870b5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index c9da501..b75ffbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -349,14 +349,14 @@ public class RestClient {
 
 		@Override
 		public void writeTo(Channel channel) {
-			channel.writeAndFlush(httpRequest);
+			ChannelFuture future = channel.writeAndFlush(httpRequest);
 			// this should never be false as we explicitly set the encoder to use multipart messages
 			if (bodyRequestEncoder.isChunked()) {
-				channel.writeAndFlush(bodyRequestEncoder);
+				future = channel.writeAndFlush(bodyRequestEncoder);
 			}
 
-			// release data and remove temporary files if they were created
-			bodyRequestEncoder.cleanFiles();
+			// release data and remove temporary files if they were created, once the writing is complete
+			future.addListener((ignored) -> bodyRequestEncoder.cleanFiles());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fea870b5/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c03b85d..1311b80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -52,6 +52,7 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -113,7 +114,9 @@ public class MultipartUploadResource extends ExternalResource {
 			CompletableFuture.completedFuture(mockRestfulGateway);
 
 		file1 = temporaryFolder.newFile();
-		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		try (RandomAccessFile rw = new RandomAccessFile(file1, "rw")) {
+			rw.setLength(1024 * 1024 * 64);
+		}
 		file2 = temporaryFolder.newFile();
 		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));