You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/12/14 02:12:22 UTC
[flink] branch release-1.15 updated: [hotfix][python] Use off-heap memory if managed memory fraction is 0
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 224faf9d5a0 [hotfix][python] Use off-heap memory if managed memory fraction is 0
224faf9d5a0 is described below
commit 224faf9d5a0807c127a34777dd3992e06ebe620d
Author: Dian Fu <di...@apache.org>
AuthorDate: Tue Dec 13 14:14:33 2022 +0800
[hotfix][python] Use off-heap memory if managed memory fraction is 0
---
.../python/beam/BeamPythonFunctionRunner.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index ad04d9b5299..45c60f9bc40 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -233,14 +233,10 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);
- if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) {
- Preconditions.checkArgument(
- managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
- String.format(
- "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. "
- + "It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\".",
- managedMemoryFraction));
-
+ if (memoryManager != null
+ && config.get(USE_MANAGED_MEMORY)
+ && managedMemoryFraction > 0
+ && managedMemoryFraction <= 1.0) {
final LongFunctionWithException<PythonSharedResources, Exception> initializer =
(size) ->
new PythonSharedResources(
@@ -259,6 +255,15 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
sharedResources.getResourceHandle().getEnvironment();
stageBundleFactory = createStageBundleFactory(jobBundleFactory, environment);
} else {
+ if (memoryManager != null
+ && config.get(USE_MANAGED_MEMORY)
+ && (managedMemoryFraction <= 0 || managedMemoryFraction > 1.0)) {
+ LOG.warn(
+ String.format(
+ "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead."
+ + "Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.",
+ managedMemoryFraction));
+ }
// there is no way to access the MemoryManager for the batch job of old planner,
// fallback to the way that spawning a Python process for each Python operator
jobBundleFactory = createJobBundleFactory(pipelineOptions);