You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/12/14 02:12:22 UTC

[flink] branch release-1.15 updated: [hotfix][python] Use off-heap memory if managed memory fraction is 0

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 224faf9d5a0 [hotfix][python] Use off-heap memory if managed memory fraction is 0
224faf9d5a0 is described below

commit 224faf9d5a0807c127a34777dd3992e06ebe620d
Author: Dian Fu <di...@apache.org>
AuthorDate: Tue Dec 13 14:14:33 2022 +0800

    [hotfix][python] Use off-heap memory if managed memory fraction is 0
---
 .../python/beam/BeamPythonFunctionRunner.java       | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index ad04d9b5299..45c60f9bc40 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -233,14 +233,10 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
         Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);
 
-        if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) {
-            Preconditions.checkArgument(
-                    managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
-                    String.format(
-                            "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. "
-                                    + "It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\".",
-                            managedMemoryFraction));
-
+        if (memoryManager != null
+                && config.get(USE_MANAGED_MEMORY)
+                && managedMemoryFraction > 0
+                && managedMemoryFraction <= 1.0) {
             final LongFunctionWithException<PythonSharedResources, Exception> initializer =
                     (size) ->
                             new PythonSharedResources(
@@ -259,6 +255,15 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
                     sharedResources.getResourceHandle().getEnvironment();
             stageBundleFactory = createStageBundleFactory(jobBundleFactory, environment);
         } else {
+            if (memoryManager != null
+                    && config.get(USE_MANAGED_MEMORY)
+                    && (managedMemoryFraction <= 0 || managedMemoryFraction > 1.0)) {
+                LOG.warn(
+                        String.format(
+                                "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead."
+                                        + "Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.",
+                                managedMemoryFraction));
+            }
             // there is no way to access the MemoryManager for the batch job of old planner,
             // fallback to the way that spawning a Python process for each Python operator
             jobBundleFactory = createJobBundleFactory(pipelineOptions);