You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/25 14:22:06 UTC
[flink] branch release-1.9 updated: [FLINK-14139][rest] Fix
potential memory leak problem of rest server.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c2956f5 [FLINK-14139][rest] Fix potential memory leak problem of rest server.
c2956f5 is described below
commit c2956f512f6c4c0f93e87a04a090ceaaf9fd64da
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 19 20:48:31 2019 +0800
[FLINK-14139][rest] Fix potential memory leak problem of rest server.
This closes #9750.
---
.../flink/runtime/rest/FileUploadHandler.java | 11 +++++++
.../flink/runtime/rest/FileUploadHandlerTest.java | 34 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 3cd9732..6f60830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
@@ -84,7 +85,17 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
public FileUploadHandler(final Path uploadDir) {
super(true);
+
+ // the clean up of temp files when jvm exits is handled by org.apache.flink.util.ShutdownHookUtil; thus,
+ // it's no need to register those files (post chunks and upload file chunks) to java.io.DeleteOnExitHook
+ // which may lead to memory leak.
+ DiskAttribute.deleteOnExitTemporaryFile = false;
+ DiskFileUpload.deleteOnExitTemporaryFile = false;
+
DiskFileUpload.baseDirectory = uploadDir.normalize().toAbsolutePath().toString();
+ // share the same directory with file upload for post chunks storage.
+ DiskAttribute.baseDirectory = DiskFileUpload.baseDirectory;
+
this.uploadDir = requireNonNull(uploadDir);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index 771fd8a..80fa4b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -38,8 +38,12 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.util.LinkedHashSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
@@ -136,6 +140,8 @@ public class FileUploadHandlerTest extends TestLogger {
try (Response response = client.newCall(fileRequest).execute()) {
assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
}
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
}
@Test
@@ -162,6 +168,8 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
assertEquals(json, mixedHandler.lastReceivedRequest);
}
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
}
@Test
@@ -188,6 +196,8 @@ public class FileUploadHandlerTest extends TestLogger {
// FileUploads are outright forbidden
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
}
@Test
@@ -212,6 +222,8 @@ public class FileUploadHandlerTest extends TestLogger {
// JSON payload did not match expected format
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
}
@Test
@@ -223,6 +235,8 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
}
MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
}
/**
@@ -238,5 +252,25 @@ public class FileUploadHandlerTest extends TestLogger {
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
}
MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+ verifyNoFileIsRegisteredToDeleteOnExitHook();
+ }
+
+ /**
+ * DiskAttribute and DiskFileUpload class of netty store post chunks and file chunks as temp files on local disk.
+ * By default, netty will register these temp files to java.io.DeleteOnExitHook which may lead to memory leak.
+ * {@link FileUploadHandler} disables the shutdown hook registration so no file should be registered. Note that
+ * clean up of temp files is handed over to {@link org.apache.flink.runtime.entrypoint.ClusterEntrypoint}.
+ */
+ private void verifyNoFileIsRegisteredToDeleteOnExitHook() {
+ try {
+ Class<?> clazz = Class.forName("java.io.DeleteOnExitHook");
+ Field field = clazz.getDeclaredField("files");
+ field.setAccessible(true);
+ LinkedHashSet files = (LinkedHashSet) field.get(null);
+ assertTrue(files.isEmpty());
+ } catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
+ fail("This should never happen.");
+ }
}
}