You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/28 05:42:10 UTC
[5/5] flink git commit: [FLINK-9677][rest] Cleanup encoder after
request has been processed
[FLINK-9677][rest] Cleanup encoder after request has been processed
This closes #6217.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce04965b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce04965b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce04965b
Branch: refs/heads/master
Commit: ce04965b647c734c9db9f596d1ef6a55c8d28c2b
Parents: aa2029c
Author: zentol <ch...@apache.org>
Authored: Wed Jun 27 09:34:34 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jun 27 22:01:17 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/rest/FileUploadHandler.java | 7 ++++++-
.../main/java/org/apache/flink/runtime/rest/RestClient.java | 8 ++++----
.../apache/flink/runtime/rest/MultipartUploadResource.java | 5 ++++-
3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce04965b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 0c910d2..aa21ddf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -174,7 +174,12 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
private void reset() {
// destroy() can fail because some data is stored multiple times in the decoder causing an IllegalReferenceCountException
// see https://github.com/netty/netty/issues/7814
- currentHttpPostRequestDecoder.getBodyHttpDatas().clear();
+ try {
+ currentHttpPostRequestDecoder.getBodyHttpDatas().clear();
+ } catch (HttpPostRequestDecoder.NotEnoughDataDecoderException ned) {
+ // this method always fails if not all chunks were offered to the decoder yet
+ LOG.debug("Error while resetting {}.", getClass(), ned);
+ }
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce04965b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e1a784..a119536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -351,14 +351,14 @@ public class RestClient {
@Override
public void writeTo(Channel channel) {
- channel.writeAndFlush(httpRequest);
+ ChannelFuture future = channel.writeAndFlush(httpRequest);
// this should never be false as we explicitly set the encoder to use multipart messages
if (bodyRequestEncoder.isChunked()) {
- channel.writeAndFlush(bodyRequestEncoder);
+ future = channel.writeAndFlush(bodyRequestEncoder);
}
- // release data and remove temporary files if they were created
- bodyRequestEncoder.cleanFiles();
+ // release data and remove temporary files if they were created, once the writing is complete
+ future.addListener((ignored) -> bodyRequestEncoder.cleanFiles());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce04965b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c03b85d..1311b80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -52,6 +52,7 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -113,7 +114,9 @@ public class MultipartUploadResource extends ExternalResource {
CompletableFuture.completedFuture(mockRestfulGateway);
file1 = temporaryFolder.newFile();
- Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ try (RandomAccessFile rw = new RandomAccessFile(file1, "rw")) {
+ rw.setLength(1024 * 1024 * 64);
+ }
file2 = temporaryFolder.newFile();
Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));