You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/08 09:44:54 UTC

[GitHub] [flink] zhuzhurk commented on a change in pull request #19003: [FLINK-26517][runtime] Normalize the decided parallelism to power of 2 when using adaptive batch scheduler

zhuzhurk commented on a change in pull request #19003:
URL: https://github.com/apache/flink/pull/19003#discussion_r821468782



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -531,7 +531,9 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The lower bound of allowed parallelism to set adaptively if %s has been set to %s",
+                                            "The lower bound of allowed parallelism to set adaptively if %s has been set to %s. "
+                                                    + "Currently, this option should be configured as a power of 2, "
+                                                    + "otherwise it will also be rounded up to a power of 2 by framework.",

Review comment:
       maybe: by framework -> automatically

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -556,14 +560,16 @@
         Documentation.Sections.EXPERT_SCHEDULING,
         Documentation.Sections.ALL_JOB_MANAGER
     })
-    public static final ConfigOption<MemorySize> ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK =
-            key("jobmanager.adaptive-batch-scheduler.data-volume-per-task")
+    public static final ConfigOption<MemorySize> ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK =
+            key("jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task")
                     .memoryType()
                     .defaultValue(MemorySize.ofMebiBytes(1024))
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The size of data volume to expect each task instance to process if %s has been set to %s",
+                                            "The average size of data volume to expect each task instance to process if %s has been set to %s. "

Review comment:
       maybe also note that the actually processed data of some tasks may be much larger if there is data skew, or if the data is too large while the parallelism has reached the upper bound?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##########
@@ -109,54 +112,85 @@ private int calculateParallelism(List<BlockingResultInfo> consumedResults) {
                             + " Use {} as the size of broadcast data to decide the parallelism.",
                     new MemorySize(broadcastBytes),
                     new MemorySize(expectedMaxBroadcastBytes),
-                    JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+                    JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(),
                     CAP_RATIO_OF_BROADCAST,
                     new MemorySize(expectedMaxBroadcastBytes));
 
             broadcastBytes = expectedMaxBroadcastBytes;
         }
 
-        int parallelism =
+        int initialParallelism =
                 (int) Math.ceil((double) nonBroadcastBytes / (dataVolumePerTask - broadcastBytes));
+        int parallelism = normalizeParallelism(initialParallelism);
 
         LOG.debug(
                 "The size of broadcast data is {}, the size of non-broadcast data is {}, "
-                        + "the initially decided parallelism is {}.",
+                        + "the initially decided parallelism is {}, after normalize is {}",
                 new MemorySize(broadcastBytes),
                 new MemorySize(nonBroadcastBytes),
+                initialParallelism,
                 parallelism);
 
         if (parallelism < minParallelism) {
             LOG.info(
-                    "The initially decided parallelism {} is smaller than the minimum parallelism {} "
-                            + "(which is configured by '{}'). Use {} as the finally decided parallelism.",
+                    "The initially normalized parallelism {} is smaller than the normalized minimum parallelism {}. "
+                            + "Use {} as the finally decided parallelism.",
                     parallelism,
                     minParallelism,
-                    JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(),
                     minParallelism);
             parallelism = minParallelism;
         } else if (parallelism > maxParallelism) {
             LOG.info(
-                    "The initially decided parallelism {} is larger than the maximum parallelism {} "
-                            + "(which is configured by '{}'). Use {} as the finally decided parallelism.",
+                    "The initially normalized parallelism {} is larger than the normalized maximum parallelism {}. "
+                            + "Use {} as the finally decided parallelism.",
                     parallelism,
                     maxParallelism,
-                    JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(),
                     maxParallelism);
             parallelism = maxParallelism;
         }
 
         return parallelism;
     }
 
-    public static DefaultVertexParallelismDecider from(Configuration configuration) {
+    @VisibleForTesting
+    int getMaxParallelism() {
+        return maxParallelism;
+    }
+
+    @VisibleForTesting
+    int getMinParallelism() {
+        return minParallelism;
+    }
+
+    static DefaultVertexParallelismDecider from(Configuration configuration) {
+        int maxParallelism = getNormalizedMaxParallelism(configuration);
+        int minParallelism = getNormalizedMinParallelism(configuration);
+        checkState(maxParallelism >= minParallelism);

Review comment:
       Let's add more information to the error so that users can know how to properly setting the configurations to fix it.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
##########
@@ -108,7 +108,9 @@ private void executeJob(Boolean isFineGrained) throws Exception {
         configuration.setInteger(
                 JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM,
                 DEFAULT_MAX_PARALLELISM);
-        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));

Review comment:
       I would propose keep this config to avoid case instability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org