You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/05 10:51:17 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #11018: [FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource

tillrohrmann commented on a change in pull request #11018: [FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource
URL: https://github.com/apache/flink/pull/11018#discussion_r375184200
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java
 ##########
 @@ -80,27 +81,37 @@
 		}
 	}
 
+	/**
+	 * Releases a lease (identified by the lease holder object) for the given type.
+	 * If no further leases exist, the resource is disposed.
+	 */
+	void release(String type, Object leaseHolder) throws Exception {
+		release(type, leaseHolder, (value) -> {});
+	}
+
 	/**
 	 * Releases a lease (identified by the lease holder object) for the given type.
 	 * If no further leases exist, the resource is disposed.
 	 *
-	 * @return True, if this was the last lease holder and the resource was disposed.
+	 * <p>This method takes an additional hook that is called when the resource is disposed.
 	 */
-	boolean release(String type, Object leaseHolder) throws Exception {
+	void release(String type, Object leaseHolder, Consumer<Long> releaser) throws Exception {
 		lock.lock();
 		try {
-			final LeasedResource resource = reservedResources.get(type);
+			final LeasedResource<?> resource = reservedResources.get(type);
 			if (resource == null) {
-				return false;
+				return;
 			}
 
 			if (resource.removeLeaseHolder(leaseHolder)) {
-				reservedResources.remove(type);
-				resource.dispose();
-				return true;
+				try {
+					reservedResources.remove(type);
+					resource.dispose();
+				}
+				finally {
+					releaser.accept(resource.size());
+				}
 
 Review comment:
   I guess my question is more related to the overall memory release logic than to this change.
   
   What is the intended behaviour of Flink if `resource.dispose` fails? According to the contract `dispose` can throw an `Exception` and I assume it wouldn't be guaranteed that `resource` would have released all of its resources (native memory). 
   
   If one follows the chain of close calls, then the release of the `SharedResources` could be triggered by `RocksDBKeyedStateBackend#dispose` where we swallow the exception via `IOUtils.closeQuietly`. Wouldn't this leave Flink in a problematic state where we have less memory than we think we have? I think it is dangerous to swallow the exceptions and it should be handled correctly. In the case of memory leakage I guess we should fail fatally.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services