You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/01/08 10:37:14 UTC

[flink] 02/02: [FLINK-20860][core] Update valid names for TaskManagerOptions#MANAGED_MEMORY_CONSUMER_WEIGHTS.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed354d9c93b38d843269c29b291271ba8400c7d9
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jan 7 14:30:14 2021 +0800

    [FLINK-20860][core] Update valid names for TaskManagerOptions#MANAGED_MEMORY_CONSUMER_WEIGHTS.
    
    This closes #14576
---
 .../_includes/generated/common_memory_section.html |   4 +-
 .../task_manager_memory_configuration.html         |   4 +-
 .../flink/configuration/TaskManagerOptions.java    |  27 +++--
 .../util/config/memory/ManagedMemoryUtils.java     | 109 ++++++++++++---------
 .../util/config/memory/ManagedMemoryUtilsTest.java |  86 +++++++++++++---
 5 files changed, 154 insertions(+), 76 deletions(-)

diff --git a/docs/_includes/generated/common_memory_section.html b/docs/_includes/generated/common_memory_section.html
index c2e0160..f4406d5 100644
--- a/docs/_includes/generated/common_memory_section.html
+++ b/docs/_includes/generated/common_memory_section.html
@@ -106,9 +106,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
-            <td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
+            <td style="word-wrap: break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
             <td>Map</td>
-            <td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are DATAPROC (for RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON (for Python processes).</td>
+            <td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are OPERATOR (for built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for Python processes).</td>
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.fraction</h5></td>
diff --git a/docs/_includes/generated/task_manager_memory_configuration.html b/docs/_includes/generated/task_manager_memory_configuration.html
index d23b267..62fa930 100644
--- a/docs/_includes/generated/task_manager_memory_configuration.html
+++ b/docs/_includes/generated/task_manager_memory_configuration.html
@@ -52,9 +52,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
-            <td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
+            <td style="word-wrap: break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
             <td>Map</td>
-            <td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are DATAPROC (for RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON (for Python processes).</td>
+            <td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are OPERATOR (for built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for Python processes).</td>
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.fraction</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 018413f..ee1bef3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -37,7 +37,14 @@ import static org.apache.flink.configuration.description.TextElement.text;
 @ConfigGroups(groups = @ConfigGroup(name = "TaskManagerMemory", keyPrefix = "taskmanager.memory"))
 public class TaskManagerOptions {
 
-    public static final String MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = "DATAPROC";
+    /**
+     * @deprecated use {@link #MANAGED_MEMORY_CONSUMER_NAME_OPERATOR} and {@link
+     *     #MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND} instead
+     */
+    @Deprecated public static final String MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = "DATAPROC";
+
+    public static final String MANAGED_MEMORY_CONSUMER_NAME_OPERATOR = "OPERATOR";
+    public static final String MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND = "STATE_BACKEND";
     public static final String MANAGED_MEMORY_CONSUMER_NAME_PYTHON = "PYTHON";
 
     // ------------------------------------------------------------------------
@@ -433,17 +440,21 @@ public class TaskManagerOptions {
                     .defaultValue(
                             new HashMap<String, String>() {
                                 {
-                                    put(MANAGED_MEMORY_CONSUMER_NAME_DATAPROC, "70");
+                                    put(MANAGED_MEMORY_CONSUMER_NAME_OPERATOR, "70");
+                                    put(MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND, "70");
                                     put(MANAGED_MEMORY_CONSUMER_NAME_PYTHON, "30");
                                 }
                             })
                     .withDescription(
-                            "Managed memory weights for different kinds of consumers. A slot’s managed memory is"
-                                    + " shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless"
-                                    + " of the number of consumers from each kind. Currently supported kinds of consumers are "
-                                    + MANAGED_MEMORY_CONSUMER_NAME_DATAPROC
-                                    + " (for RocksDB state backend in streaming and built-in"
-                                    + " algorithms in batch) and "
+                            "Managed memory weights for different kinds of consumers. A slot’s"
+                                    + " managed memory is shared by all kinds of consumers it"
+                                    + " contains, proportionally to the kinds’ weights and"
+                                    + " regardless of the number of consumers from each kind."
+                                    + " Currently supported kinds of consumers are "
+                                    + MANAGED_MEMORY_CONSUMER_NAME_OPERATOR
+                                    + " (for built-in algorithms), "
+                                    + MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND
+                                    + " (for RocksDB state backend) and "
                                     + MANAGED_MEMORY_CONSUMER_NAME_PYTHON
                                     + " (for Python processes).");
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
index 8fe0e03..3b54a93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
@@ -19,22 +19,25 @@
 package org.apache.flink.runtime.util.config.memory;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.state.StateBackendLoader;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /** Utils for configuration and calculations related to managed memory and its various use cases. */
 public enum ManagedMemoryUtils {
@@ -44,11 +47,20 @@ public enum ManagedMemoryUtils {
 
     private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
 
-    /** Valid names of managed memory consumers. */
-    private static final String[] MANAGED_MEMORY_CONSUMER_VALID_NAMES = {
-        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
-        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON
-    };
+    /** Names of managed memory use cases, in the fallback order. */
+    @SuppressWarnings("deprecation")
+    private static final Map<ManagedMemoryUseCase, List<String>> USE_CASE_CONSUMER_NAMES =
+            ImmutableMap.of(
+                    ManagedMemoryUseCase.OPERATOR,
+                    ImmutableList.of(
+                            TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+                            TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
+                    ManagedMemoryUseCase.STATE_BACKEND,
+                    ImmutableList.of(
+                            TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND,
+                            TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
+                    ManagedMemoryUseCase.PYTHON,
+                    ImmutableList.of(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON));
 
     public static double convertToFractionOfSlot(
             ManagedMemoryUseCase useCase,
@@ -86,52 +98,53 @@ public enum ManagedMemoryUtils {
     @VisibleForTesting
     static Map<ManagedMemoryUseCase, Integer> getManagedMemoryUseCaseWeightsFromConfig(
             Configuration config) {
-        final Map<String, String> consumerWeights =
+        final Map<String, String> configuredWeights =
                 config.get(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS);
+        final Map<ManagedMemoryUseCase, Integer> effectiveWeights = new HashMap<>();
+
+        for (Map.Entry<ManagedMemoryUseCase, List<String>> entry :
+                USE_CASE_CONSUMER_NAMES.entrySet()) {
+            final ManagedMemoryUseCase useCase = entry.getKey();
+            final Iterator<String> nameIter = entry.getValue().iterator();
+
+            boolean findWeight = false;
+            while (!findWeight && nameIter.hasNext()) {
+                final String name = nameIter.next();
+                final String weightStr = configuredWeights.get(name);
+                if (weightStr != null) {
+                    final int weight = Integer.parseInt(weightStr);
+                    findWeight = true;
+
+                    if (weight < 0) {
+                        throw new IllegalConfigurationException(
+                                String.format(
+                                        "Managed memory weight should not be negative. Configured "
+                                                + "weight for %s is %d.",
+                                        useCase, weight));
+                    }
+
+                    if (weight == 0) {
+                        LOG.debug(
+                                "Managed memory consumer weight for {} is configured to 0. Jobs "
+                                        + "containing this type of managed memory consumers may "
+                                        + "fail due to not being able to allocate managed memory.",
+                                useCase);
+                    }
+
+                    effectiveWeights.put(useCase, weight);
+                }
+            }
 
-        for (String consumer : MANAGED_MEMORY_CONSUMER_VALID_NAMES) {
-            if (!consumerWeights.containsKey(consumer)) {
+            if (!findWeight) {
                 LOG.debug(
-                        "Managed memory consumer weight for {} is not configured. Jobs containing this type of "
-                                + "managed memory consumers may fail due to not being able to allocate managed memory.",
-                        consumer);
+                        "Managed memory consumer weight for {} is not configured. Jobs containing "
+                                + "this type of managed memory consumers may fail due to not being "
+                                + "able to allocate managed memory.",
+                        useCase);
             }
         }
 
-        return consumerWeights.entrySet().stream()
-                .flatMap(
-                        (entry) -> {
-                            final String consumer = entry.getKey();
-                            final int weight = Integer.parseInt(entry.getValue());
-
-                            if (weight < 0) {
-                                throw new IllegalConfigurationException(
-                                        String.format(
-                                                "Managed memory weight should not be negative. Configured weight for %s is %d.",
-                                                consumer, weight));
-                            }
-
-                            if (weight == 0) {
-                                LOG.debug(
-                                        "Managed memory consumer weight for {} is configured to 0. Jobs containing this type of "
-                                                + "managed memory consumers may fail due to not being able to allocate managed memory.",
-                                        consumer);
-                            }
-
-                            switch (consumer) {
-                                case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC:
-                                    return Stream.of(
-                                            Tuple2.of(ManagedMemoryUseCase.OPERATOR, weight),
-                                            Tuple2.of(ManagedMemoryUseCase.STATE_BACKEND, weight));
-                                case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON:
-                                    return Stream.of(
-                                            Tuple2.of(ManagedMemoryUseCase.PYTHON, weight));
-                                default:
-                                    throw new IllegalConfigurationException(
-                                            "Unknown managed memory consumer: " + consumer);
-                            }
-                        })
-                .collect(Collectors.toMap((tuple2) -> tuple2.f0, (tuple2) -> tuple2.f1));
+        return effectiveWeights;
     }
 
     public static double getFractionRoundedDown(final long dividend, final long divisor) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
index 650a1c7..31ab8fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
@@ -45,6 +45,9 @@ public class ManagedMemoryUtilsTest extends TestLogger {
 
     private static final int DATA_PROC_WEIGHT = 111;
     private static final int PYTHON_WEIGHT = 222;
+    private static final int OPERATOR_WEIGHT = 333;
+    private static final int STATE_BACKEND_WEIGHT = 444;
+    private static final int TOTAL_WEIGHT = PYTHON_WEIGHT + OPERATOR_WEIGHT + STATE_BACKEND_WEIGHT;
 
     private static final UnmodifiableConfiguration CONFIG_WITH_ALL_USE_CASES =
             new UnmodifiableConfiguration(
@@ -62,6 +65,35 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                                                     TaskManagerOptions
                                                             .MANAGED_MEMORY_CONSUMER_NAME_PYTHON,
                                                     String.valueOf(PYTHON_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            .MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+                                                    String.valueOf(OPERATOR_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            .MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND,
+                                                    String.valueOf(STATE_BACKEND_WEIGHT));
+                                        }
+                                    });
+                        }
+                    });
+
+    private static final UnmodifiableConfiguration CONFIG_WITH_LEGACY_USE_CASES =
+            new UnmodifiableConfiguration(
+                    new Configuration() {
+                        {
+                            set(
+                                    TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
+                                    new HashMap<String, String>() {
+                                        {
+                                            put(
+                                                    TaskManagerOptions
+                                                            .MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
+                                                    String.valueOf(DATA_PROC_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            .MANAGED_MEMORY_CONSUMER_NAME_PYTHON,
+                                                    String.valueOf(PYTHON_WEIGHT));
                                         }
                                     });
                         }
@@ -72,8 +104,8 @@ public class ManagedMemoryUtilsTest extends TestLogger {
         final Map<ManagedMemoryUseCase, Integer> expectedWeights =
                 new HashMap<ManagedMemoryUseCase, Integer>() {
                     {
-                        put(ManagedMemoryUseCase.STATE_BACKEND, DATA_PROC_WEIGHT);
-                        put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.OPERATOR, OPERATOR_WEIGHT);
+                        put(ManagedMemoryUseCase.STATE_BACKEND, STATE_BACKEND_WEIGHT);
                         put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT);
                     }
                 };
@@ -85,18 +117,22 @@ public class ManagedMemoryUtilsTest extends TestLogger {
         assertThat(configuredWeights, is(expectedWeights));
     }
 
-    @Test(expected = IllegalConfigurationException.class)
-    public void testGetWeightsFromConfigFailUnknownUseCase() {
-        final Configuration config =
-                new Configuration() {
+    @Test
+    public void testGetWeightsFromConfigLegacy() {
+        final Map<ManagedMemoryUseCase, Integer> expectedWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
                     {
-                        set(
-                                TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
-                                Collections.singletonMap("UNKNOWN_KEY", "123"));
+                        put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.STATE_BACKEND, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT);
                     }
                 };
 
-        ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(config);
+        final Map<ManagedMemoryUseCase, Integer> configuredWeights =
+                ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(
+                        CONFIG_WITH_LEGACY_USE_CASES);
+
+        assertThat(configuredWeights, is(expectedWeights));
     }
 
     @Test(expected = IllegalConfigurationException.class)
@@ -107,7 +143,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         set(
                                 TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
                                 Collections.singletonMap(
-                                        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
+                                        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
                                         "-123"));
                     }
                 };
@@ -127,14 +163,15 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         new HashSet<ManagedMemoryUseCase>() {
                             {
                                 add(ManagedMemoryUseCase.OPERATOR);
+                                add(ManagedMemoryUseCase.STATE_BACKEND);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
                         CONFIG_WITH_ALL_USE_CASES,
-                        Optional.empty(),
+                        Optional.of(true),
                         ClassLoader.getSystemClassLoader());
 
-        assertEquals(fractionOfUseCase / 3, fractionOfSlot, DELTA);
+        assertEquals(fractionOfUseCase * OPERATOR_WEIGHT / TOTAL_WEIGHT, fractionOfSlot, DELTA);
     }
 
     @Test
@@ -158,11 +195,12 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         new HashSet<ManagedMemoryUseCase>() {
                             {
                                 add(ManagedMemoryUseCase.OPERATOR);
+                                add(ManagedMemoryUseCase.STATE_BACKEND);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
                         config,
-                        Optional.empty(),
+                        Optional.of(true),
                         ClassLoader.getSystemClassLoader());
 
         assertEquals(0.0, fractionOfSlot, DELTA);
@@ -171,27 +209,42 @@ public class ManagedMemoryUtilsTest extends TestLogger {
     @Test
     public void testConvertToFractionOfSlotStateBackendUseManagedMemory() {
         testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
-                true, 1.0 / 3, 1.0 * 2 / 3);
+                true,
+                1.0 * OPERATOR_WEIGHT / TOTAL_WEIGHT,
+                1.0 * STATE_BACKEND_WEIGHT / TOTAL_WEIGHT,
+                1.0 * PYTHON_WEIGHT / TOTAL_WEIGHT);
     }
 
     @Test
     public void testConvertToFractionOfSlotStateBackendNotUserManagedMemory() {
-        testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(false, 0.0, 1.0);
+        final int totalWeight = OPERATOR_WEIGHT + PYTHON_WEIGHT;
+        testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
+                false, 1.0 * OPERATOR_WEIGHT / totalWeight, 0.0, 1.0 * PYTHON_WEIGHT / totalWeight);
     }
 
     private void testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
             boolean stateBackendUsesManagedMemory,
+            double expectedOperatorFractionOfSlot,
             double expectedStateFractionOfSlot,
             double expectedPythonFractionOfSlot) {
 
         final Set<ManagedMemoryUseCase> allUseCases =
                 new HashSet<ManagedMemoryUseCase>() {
                     {
+                        add(ManagedMemoryUseCase.OPERATOR);
                         add(ManagedMemoryUseCase.STATE_BACKEND);
                         add(ManagedMemoryUseCase.PYTHON);
                     }
                 };
 
+        final double opFractionOfSlot =
+                ManagedMemoryUtils.convertToFractionOfSlot(
+                        ManagedMemoryUseCase.OPERATOR,
+                        1.0,
+                        allUseCases,
+                        CONFIG_WITH_ALL_USE_CASES,
+                        Optional.of(stateBackendUsesManagedMemory),
+                        ClassLoader.getSystemClassLoader());
         final double stateFractionOfSlot =
                 ManagedMemoryUtils.convertToFractionOfSlot(
                         ManagedMemoryUseCase.STATE_BACKEND,
@@ -209,6 +262,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         Optional.of(stateBackendUsesManagedMemory),
                         ClassLoader.getSystemClassLoader());
 
+        assertEquals(expectedOperatorFractionOfSlot, opFractionOfSlot, DELTA);
         assertEquals(expectedStateFractionOfSlot, stateFractionOfSlot, DELTA);
         assertEquals(expectedPythonFractionOfSlot, pythonFractionOfSlot, DELTA);
     }