You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/29 12:20:04 UTC

[GitHub] [flink] reswqa opened a new pull request, #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

reswqa opened a new pull request, #21419:
URL: https://github.com/apache/flink/pull/21419

   ## What is the purpose of the change
   
   *Hybrid full result partition is re-consumable now, and we have enabled `VertexwiseSchedulingStrategy` to support the scheduling of hybrid type edges in [FLINK-29767](https://issues.apache.org/jira/browse/FLINK-29767). Now is the time to enable hybrid full result partition to support speculative execution.
   In the case of speculative execution, downstream task can be scheduled only if all upstream task finished. In the next ticket, we will introduce a new mechanism to enable the downstream to be scheduled after partial upstream task is finished, thus improving the performance.*
   
   
   ## Brief change log
     - *Speculatvie execution vertex supports mark hybrid full result partition finished.*
     - *Introduce AllFinishedInputConsumableDecider.*
     - *Speculative scheduler supports hybrid full result partition.*
   
   
   ## Verifying this change
   
   This change added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1043991638


##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -605,6 +605,17 @@ public enum SchedulerType {
                                             code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
+            key("jobmanager.adaptive-batch-scheduler.only-consume-finished-partition")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Controls whether the scheduler only allows downstream task consume finished partition.");

Review Comment:
   I think this is probably a configuration of hybrid shuffle, not adaptive batch scheduler.
   - This config option can also be supported by other schedulers. We just decide to only support adaptive batch scheduler atm.
   - Pipelined shuffle always consumes unfinished partitions, and blocking shuffle always consumes finished partitions. This config option only makes sense for hybrid shuffle.
   
   I'd suggest the config key `taskmanager.network.hybrid-shuffle.only-consume-finished-partition` and move it to `NettyShuffleEnvironmentOptions`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java:
##########
@@ -120,4 +120,6 @@ void notifySchedulerNgAboutInternalTaskFailure(
     /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */
     List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
             IntermediateDataSetID intermediateResultPartition);
+
+    boolean isOnlyConsumeFinishedPartition();

Review Comment:
   This should not be a property of execution graph. I think it's rather a property of result partition. We can read the configuration in `StreamingJobGraphGenerator`, and pass it all the way to the `IntermediateResultPartition`.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -605,6 +605,17 @@ public enum SchedulerType {
                                             code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
+            key("jobmanager.adaptive-batch-scheduler.only-consume-finished-partition")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   I'd suggest `noDefaultValue()`. If not explicitly specified by users, it should behaves differently according to the conditions. E.g., `true` if speculative is enabled, `false` otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21419:
URL: https://github.com/apache/flink/pull/21419#issuecomment-1330549923

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "43f4095e23f3396da77a7b287d18424fd971abfe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "43f4095e23f3396da77a7b287d18424fd971abfe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 43f4095e23f3396da77a7b287d18424fd971abfe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1053028752


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -78,6 +79,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
 
     private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
 
+    private final boolean onlyConsumeFinishedPartition;

Review Comment:
   Let's name this `hybridOnlyConsumeFinishedPartition`, to emphasize that this only affects hybrid result partitions.
   
   Same for the variables / fields in other classes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java:
##########
@@ -216,6 +224,13 @@ public SchedulerNG createInstance(
         }
     }
 
+    public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory(
+            boolean onlyConsumeFinishedPartition) {
+        return onlyConsumeFinishedPartition
+                ? AllFinishedInputConsumableDecider.Factory.INSTANCE
+                : DefaultInputConsumableDecider.Factory.INSTANCE;

Review Comment:
   This is hard to understand, because the semantic of `onlyConsumeFinishedPartition` and `AllFinishedInputConsumableDecider` are not equivalent. I understand that this is an intermediate state, but we'd better to explain it in comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040387708


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -46,7 +46,7 @@ public class IntermediateResultPartition {
     /** Number of subpartitions. Initialized lazily and will not change once set. */
     private int numberOfSubpartitions = UNKNOWN;
 
-    /** Whether this partition has produced some data. */
+    /** Whether this partition has produced all data. */

Review Comment:
   Name should also change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040779118


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -181,18 +188,33 @@ public HsDataView registerNewConsumer(
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
     public void close() {
-        Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
-        handleDecision(Optional.of(decision));
-        spiller.close();
-        poolSizeChecker.shutdown();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.close();
+            poolSizeChecker.shutdown();
+        }
     }
 
     /**
      * Release this {@link HsMemoryDataManager}, it means all memory taken by this class will
      * recycle.
      */
     public void release() {
-        spiller.release();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.release();
+        }
+    }

Review Comment:
   Agree with you, we should release all network buffers in close method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040349304


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -181,18 +188,33 @@ public HsDataView registerNewConsumer(
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
     public void close() {
-        Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
-        handleDecision(Optional.of(decision));
-        spiller.close();
-        poolSizeChecker.shutdown();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.close();
+            poolSizeChecker.shutdown();
+        }
     }
 
     /**
      * Release this {@link HsMemoryDataManager}, it means all memory taken by this class will
      * recycle.
      */
     public void release() {
-        spiller.release();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.release();
+        }
+    }

Review Comment:
   Not sure about introducing a dedicated lock for this.
   
   I checked contracts in `ResultPartitionWriter`, it says `close` means releasing all allocated resources and `release` means releasing all the data. For the memory data manager, these are probably the same thing. Once closed, data can only be consumed from the file data manager.
   
   So if `close` is always called, maybe we can do everything in `close` and nothing in `release`. I assume it doesn't really matters if the buffers are not released immediately when `release` is called first? In this way, we won't need the lock between these two.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java:
##########
@@ -234,13 +239,15 @@ private void checkAllExchangesBlocking(final JobGraph jobGraph) {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
                 checkState(
-                        dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition(),
+                        dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition()
+                                || dataSet.getResultType() == ResultPartitionType.HYBRID_FULL,
                         String.format(
                                 "At the moment, adaptive batch scheduler requires batch workloads "
-                                        + "to be executed with types of all edges being BLOCKING. "
-                                        + "To do that, you need to configure '%s' to '%s'.",
+                                        + "to be executed with types of all edges being BLOCKING or HYBRID_FULL. "
+                                        + "To do that, you need to configure '%s' to '%s' or '%s'.",
                                 ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
-                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
+                                BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL));

Review Comment:
   Are we deciding the shuffle mode based on the scheduler in anywhere?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##########
@@ -295,6 +296,14 @@ void cachePartitionInfo(PartitionInfo partitionInfo) {
                 "Method is not supported in SpeculativeExecutionVertex.");
     }
 
+    @Override
+    protected boolean needMarkPartitionFinished(ResultPartitionType resultPartitionType) {
+        // for speculative execution, only blocking or hybrid full result partition need mark
+        // finished.
+        return resultPartitionType.isBlockingOrBlockingPersistentResultPartition()
+                || resultPartitionType == ResultPartitionType.HYBRID_FULL;
+    }

Review Comment:
   I think this is not only for the speculative execution vertex. By default, in speculative execution we want the downstream to only consume the finished upstream partitions, and in non-speculative execution we want the downstream to also consume the unfinished partitions. However, user should also be able to choose other behaviors, consuming unfinished partitions even in speculative mode, or consuming only finished partitions in non-speculative mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040712945


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##########
@@ -295,6 +296,14 @@ void cachePartitionInfo(PartitionInfo partitionInfo) {
                 "Method is not supported in SpeculativeExecutionVertex.");
     }
 
+    @Override
+    protected boolean needMarkPartitionFinished(ResultPartitionType resultPartitionType) {
+        // for speculative execution, only blocking or hybrid full result partition need mark
+        // finished.
+        return resultPartitionType.isBlockingOrBlockingPersistentResultPartition()
+                || resultPartitionType == ResultPartitionType.HYBRID_FULL;
+    }

Review Comment:
   Consuming unfinished partitions in speculative mode may be difficult to support at present, but consuming only finished partitions in non-speculative mode sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040387708


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -46,7 +46,7 @@ public class IntermediateResultPartition {
     /** Number of subpartitions. Initialized lazily and will not change once set. */
     private int numberOfSubpartitions = UNKNOWN;
 
-    /** Whether this partition has produced some data. */
+    /** Whether this partition has produced all data. */

Review Comment:
   name should all change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #21419: [FLINK-30233] Hybrid full result partition should also work with speculative execution
URL: https://github.com/apache/flink/pull/21419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org