You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/01 04:41:24 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

zhuzhurk commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r886299714


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.types.IntValue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for reusing persisted intermediate dataset */
+public class JobIntermediateDatasetReuseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
+
+    @Test
+    public void testClusterPartitionReuse() throws Exception {
+        final TestingMiniClusterConfiguration miniClusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder().build();
+
+        try (TestingMiniCluster miniCluster =
+                TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+            miniCluster.start();
+
+            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+            final JobGraph firstJobGraph = createFirstJobGraph(1, intermediateDataSetID);
+            miniCluster.submitJob(firstJobGraph).get();
+            CompletableFuture<JobResult> jobResultFuture =
+                    miniCluster.requestJobResult(firstJobGraph.getJobID());
+            JobResult jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+
+            final JobGraph secondJobGraph = createSecondJobGraph(1, intermediateDataSetID);
+            miniCluster.submitJob(secondJobGraph).get();
+            jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+        }
+    }
+
+    @Test
+    public void testClusterPartitionReuseMultipleParallelism() throws Exception {

Review Comment:
   Looks to me the only differences of these tests are the parallelisms and result verification.
   Can we extract the shared logic to a method and let each tests to just invoke the method with different parallelisms and verification implementations?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -568,6 +583,14 @@ public void setResultOptimizerProperties(String resultOptimizerProperties) {
         this.resultOptimizerProperties = resultOptimizerProperties;
     }
 
+    public void addIntermediateDataSetIdToConsume(IntermediateDataSetID intermediateDataSetId) {
+        intermediateDataSetIdsToConsume.add(intermediateDataSetId);
+    }
+
+    public List<IntermediateDataSetID> getIntermediateDataSetIdToConsume() {

Review Comment:
   getIntermediateDataSetIdToConsume -> getIntermediateDataSetIdsToConsume



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##########
@@ -120,6 +125,26 @@ public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions()
         return partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList());
     }
 
+    @Override
+    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
+        this.resourceManagerGateway = resourceManagerGateway;
+    }
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        Preconditions.checkNotNull(
+                resourceManagerGateway, "JobMaster is not connected to ResourceManager");
+        try {
+            return this.resourceManagerGateway

Review Comment:
   Can we add a cache for it? Otherwise it can result in thousands of RPCs when deploying a large scale job vertex.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -244,7 +266,50 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex(
                 internalExecutionGraphAccessor.getPartitionLocationConstraint(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                clusterPartitionShuffleDescriptors);
+    }
+
+    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+            getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+                executionVertex.getExecutionGraphAccessor();
+        final List<IntermediateDataSetID> consumedClusterDataSetIds =
+                executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdToConsume();
+        Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors =
+                new HashMap<>();
+
+        for (IntermediateDataSetID consumedClusterDataSetId : consumedClusterDataSetIds) {
+            Collection<? extends ShuffleDescriptor> shuffleDescriptors =
+                    internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
+                            consumedClusterDataSetId);
+
+            Preconditions.checkState(
+                    executionVertex.getTotalNumberOfParallelSubtasks() == shuffleDescriptors.size(),
+                    "The parallelism (%s) of the cache consuming job vertex is "
+                            + "different from the number of shuffle descriptors (%s) of the intermediate data set",
+                    executionVertex.getTotalNumberOfParallelSubtasks(),
+                    shuffleDescriptors.size());
+
+            shuffleDescriptors =

Review Comment:
   I would add the assumption that ShuffleMaster has returned ordered shuffleDescriptors. And then we can change the complexity to O(1) by getting the only shuffle descriptor via the index. Otherwise it will result in an O(N) complexity and further result in an O(N^2) complexity when deploying N execution vertices.
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java:
##########
@@ -42,6 +44,7 @@ public class TestingSchedulingExecutionVertex implements SchedulingExecutionVert
 
     private final Map<IntermediateResultPartitionID, TestingSchedulingResultPartition>
             resultPartitionsById;
+    private final List<IntermediateDataSetID> cachedIntermediateDataSetID;

Review Comment:
   cachedIntermediateDataSetID -> consumedCachedIntermediateDataSetId
   
   As once explained, we need to clarify it is a produced partition or a consumed partition to avoid confusion.
   Similar changes should also be applied around.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java:
##########
@@ -131,6 +147,7 @@ public static class Builder {
         private final Map<IntermediateResultPartitionID, TestingSchedulingResultPartition>
                 resultPartitionsById = new HashMap<>();
         private ExecutionState executionState = ExecutionState.CREATED;
+        private final List<IntermediateDataSetID> cachedIntermediateDataset = new ArrayList<>();

Review Comment:
   Dataset -> DataSet 
   
   To be aligned with others.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -38,11 +39,16 @@ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartit
 
     private final IntermediateDataSetID intermediateDataSetID;
 
-    private ConsumedPartitionGroup(List<IntermediateResultPartitionID> resultPartitions) {
+    private final ResultPartitionType resultPartitionType;
+
+    private ConsumedPartitionGroup(
+            List<IntermediateResultPartitionID> resultPartitions,
+            ResultPartitionType resultPartitionType) {
         checkArgument(
                 resultPartitions.size() > 0,
                 "The size of result partitions in the ConsumedPartitionGroup should be larger than 0.");
         this.intermediateDataSetID = resultPartitions.get(0).getIntermediateDataSetID();
+        this.resultPartitionType = resultPartitionType;

Review Comment:
   better to `checkNotNull`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java:
##########
@@ -177,6 +177,15 @@ public TestingSchedulingExecutionVertex newExecutionVertex(
         return newVertex;
     }
 
+    public TestingSchedulingExecutionVertex newExecutionVertex(

Review Comment:
   maybe `newExecutionVertexToConsumeCachedIntermediateDataSet`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -137,6 +148,16 @@ private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors
                                     consumedIntermediateResult, consumedPartitionGroup)));
         }
 
+        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+                consumedClusterPartitionShuffleDescriptors.entrySet()) {
+            inputGates.add(
+                    new InputGateDeploymentDescriptor(
+                            entry.getKey(),
+                            ResultPartitionType.BLOCKING_PERSISTENT,
+                            0,

Review Comment:
   I'd suggest to explicitly add this assumption to the comment because it is not easy to understand unless one reads the assumption/limitation of the FLIP. Because theoretically in runtime, one partition can be consumed by multiple consumers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org