You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/06 08:21:22 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #10453: [FLINK-14026][python] Manage the resource of Python worker properly

hequn8128 commented on a change in pull request #10453: [FLINK-14026][python] Manage the resource of Python worker properly
URL: https://github.com/apache/flink/pull/10453#discussion_r354690513
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##########
 @@ -100,6 +108,30 @@ public void open() throws Exception {
 
 			Map<String, String> jobParams = getExecutionConfig().getGlobalJobParameters().toMap();
 
+			long requiredPythonWorkerMemory =
+				MemorySize.parse(
+					jobParams.getOrDefault(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.key(),
+						String.valueOf(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.defaultValue())))
+					.add(MemorySize.parse(jobParams.getOrDefault(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.key(),
+						String.valueOf(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.defaultValue()))))
+					.getBytes();
+			MemoryManager memoryManager = getContainingTask().getEnvironment().getMemoryManager();
+			long availableManagedMemory = memoryManager.computeMemorySize(
+				getOperatorConfig().getManagedMemoryFraction());
+			if (requiredPythonWorkerMemory <= availableManagedMemory) {
+				memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, requiredPythonWorkerMemory);
+				LOG.info("Reserved memory {} for Python worker.", requiredPythonWorkerMemory);
+				this.reservedMemory = requiredPythonWorkerMemory;
+				// TODO enforce the memory limit of Python worker
+			} else {
+				LOG.warn("Required Python worker memory {} exceeds the available managed off-heap " +
+					"memory {}. Skipping reserving off-heap memory from the MemoryManager. This does " +
+					"not affect the functionality. However, it may affect the stability of a job as " +
+					"the memory used by the Python worker is not managed by Flink.",
+					requiredPythonWorkerMemory, availableManagedMemory);
+				this.reservedMemory = -1;
+			}
 
 Review comment:
   Extract a method for this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services