You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/13 12:24:01 UTC

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

gaoyunhaii commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r872337757


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -299,6 +302,21 @@ private void handleTaskFailure(
         final FailureHandlingResult failureHandlingResult =
                 executionFailureHandler.getFailureHandlingResult(
                         executionVertexId, error, timestamp);
+
+        // Notify shuffle master that the cached intermediate dataset is corrupted.
+        if (failureHandlingResult.getError() instanceof CacheCorruptedException) {

Review Comment:
   I think it might be not sufficient to only consider `DefaultScheduler`. We might also have AdaptiveBatchScheduler for batch jobs and AdaptiveScheduler for streaming jobs (if we also consider the case to start a new streaming job with cached result partition). 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -142,6 +142,12 @@ public class JobVertex implements java.io.Serializable {
      */
     private String resultOptimizerProperties;
 
+    /**
+     * Optional, the intermediateDataSetId of the cached intermediate dataset that the job vertex
+     * consumes.
+     */
+    @Nullable private final IntermediateDataSetID intermediateDataSetID;

Review Comment:
   Each JobVertex may consumes multiple `IntermediateDataSet`, thus I think it is not sufficient to bookkeep one id here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -45,6 +56,9 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final Map<IntermediateDataSetID, Collection<NettyShuffleDescriptor>>

Review Comment:
   Previously for TaskExecutor managed partitions, the process of prompting should be implemented in `JobMasterPartitionTracker` directly, thus the `NettyShuffleMaster` should not need to manage the cluster partitions directly? 
   
   Also I'm a bit wondering if it is possible we unify the two processes? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##########
@@ -84,6 +86,33 @@ CompletableFuture<T> registerPartitionWithProducer(
             PartitionDescriptor partitionDescriptor,
             ProducerDescriptor producerDescriptor);
 
+    /**
+     * Returns all the shuffle descriptors for the partitions in the intermediate data set with the
+     * given id.
+     *
+     * @param intermediateDataSetID The id of hte intermediate data set.
+     * @return all the shuffle descriptors for the partitions in the intermediate data set. Null if
+     *     not exist.
+     */
+    default Collection<T> getClusterPartitionShuffleDescriptors(

Review Comment:
   Might also cc @wsry for a double check of the proposed methods here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org