You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/25 14:22:06 UTC

[flink] branch release-1.9 updated: [FLINK-14139][rest] Fix potential memory leak problem of rest server.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c2956f5  [FLINK-14139][rest] Fix potential memory leak problem of rest server.
c2956f5 is described below

commit c2956f512f6c4c0f93e87a04a090ceaaf9fd64da
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 19 20:48:31 2019 +0800

    [FLINK-14139][rest] Fix potential memory leak problem of rest server.
    
    This closes #9750.
---
 .../flink/runtime/rest/FileUploadHandler.java      | 11 +++++++
 .../flink/runtime/rest/FileUploadHandlerTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 3cd9732..6f60830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
@@ -84,7 +85,17 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 	public FileUploadHandler(final Path uploadDir) {
 		super(true);
+
+		// the clean up of temp files when jvm exits is handled by org.apache.flink.util.ShutdownHookUtil; thus,
+		// it's no need to register those files (post chunks and upload file chunks) to java.io.DeleteOnExitHook
+		// which may lead to memory leak.
+		DiskAttribute.deleteOnExitTemporaryFile = false;
+		DiskFileUpload.deleteOnExitTemporaryFile = false;
+
 		DiskFileUpload.baseDirectory = uploadDir.normalize().toAbsolutePath().toString();
+		// share the same directory with file upload for post chunks storage.
+		DiskAttribute.baseDirectory = DiskFileUpload.baseDirectory;
+
 		this.uploadDir = requireNonNull(uploadDir);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index 771fd8a..80fa4b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -38,8 +38,12 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.util.LinkedHashSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
@@ -136,6 +140,8 @@ public class FileUploadHandlerTest extends TestLogger {
 		try (Response response = client.newCall(fileRequest).execute()) {
 			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
 		}
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
 	}
 
 	@Test
@@ -162,6 +168,8 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
 			assertEquals(json, mixedHandler.lastReceivedRequest);
 		}
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
 	}
 
 	@Test
@@ -188,6 +196,8 @@ public class FileUploadHandlerTest extends TestLogger {
 			// FileUploads are outright forbidden
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
 	}
 
 	@Test
@@ -212,6 +222,8 @@ public class FileUploadHandlerTest extends TestLogger {
 			// JSON payload did not match expected format
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
 	}
 
 	@Test
@@ -223,6 +235,8 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
 		}
 		MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
 	}
 
 	/**
@@ -238,5 +252,25 @@ public class FileUploadHandlerTest extends TestLogger {
 			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
 		}
 		MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+		verifyNoFileIsRegisteredToDeleteOnExitHook();
+	}
+
+	/**
+	 * DiskAttribute and DiskFileUpload class of netty store post chunks and file chunks as temp files on local disk.
+	 * By default, netty will register these temp files to java.io.DeleteOnExitHook which may lead to memory leak.
+	 * {@link FileUploadHandler} disables the shutdown hook registration so no file should be registered. Note that
+	 * clean up of temp files is handed over to {@link org.apache.flink.runtime.entrypoint.ClusterEntrypoint}.
+	 */
+	private void verifyNoFileIsRegisteredToDeleteOnExitHook() {
+		try {
+			Class<?> clazz = Class.forName("java.io.DeleteOnExitHook");
+			Field field = clazz.getDeclaredField("files");
+			field.setAccessible(true);
+			LinkedHashSet files = (LinkedHashSet) field.get(null);
+			assertTrue(files.isEmpty());
+		} catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
+			fail("This should never happen.");
+		}
 	}
 }