You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 08:46:30 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r894258550


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java:
##########
@@ -36,6 +36,12 @@ public enum StreamExchangeMode {
      */
     BATCH,
 
+    /**
+     * The producer and consumer can run at the same time, or consumer can start after the producer
+     * finished.

Review Comment:
   ```
   The consumer can start consuming data anytime as long as the producer has started producing.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,20 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible. The amount of
+     * data spilled to disk should be minimized.

Review Comment:
   I won't be so sure about this. There could be different spilling strategies.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -984,6 +989,10 @@ private void connect(Integer headOfChain, StreamEdge edge) {
                         "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
         }
 
+        if (resultPartitionType == ResultPartitionType.HYBRID) {
+            hasHybridResultPartition = true;

Review Comment:
   We can move this into the `case HYBRID:` branch.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java:
##########
@@ -356,6 +373,111 @@ public void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
         assertEquals(3, testingSchedulerOperation.getScheduledVertices().size());
     }
 
+    @Test
+    public void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 1);
+        final JobVertex v2 = createJobVertex("v2", 1);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology);
+
+        // all regions will be scheduled
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
+        schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED);
+
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+    }
+
+    /** Inner blocking edge will not affect it's region be scheduled. */
+    @Test
+    public void testSchedulingRegionWithInnerBlockingEdge() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 1);
+        final JobVertex v2 = createJobVertex("v2", 1);
+        final JobVertex v3 = createJobVertex("v3", 1);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        v3.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        startScheduling(schedulingTopology);
+
+        assertEquals(1, testingSchedulerOperation.getScheduledVertices().size());
+        List<ExecutionVertexID> executionVertexIds =
+                testingSchedulerOperation.getScheduledVertices().get(0);
+        assertEquals(3, executionVertexIds.size());
+    }
+
+    /**
+     * If a region have blocking and non-blocking input edge at the same time, it will be scheduled
+     * after it's all blocking edge finished, non-blocking edge don't block scheduling.
+     */
+    @Test
+    public void testDownStreamRegionWillNotBeBlockedByNonBlockingEdge() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 2);
+        final JobVertex v2 = createJobVertex("v2", 2);
+        final JobVertex v3 = createJobVertex("v3", 1);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        // note that if it v1 -> v3 changed to a pipelined edge, the downstream will not form an
+        // independent region, which will form a blocking edge in the region and will not affect the
+        // scheduling, already covered by testSchedulingRegionWithInnerBlockingEdge.
+        v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        final PipelinedRegionSchedulingStrategy schedulingStrategy =
+                startScheduling(schedulingTopology);
+
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v21 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[0];
+        v21.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(v21.getID(), ExecutionState.FINISHED);
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v22 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[1];
+        v22.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(v22.getID(), ExecutionState.FINISHED);
+        assertEquals(3, testingSchedulerOperation.getScheduledVertices().size());
+    }
+

Review Comment:
   How do these 2 cases relate to this PR?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   We can use the annotation `@Test(expected = IllegalStateException.class)`. In this way, we don't need the `try-catch` and `fail()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/BatchShuffleMode.java:
##########
@@ -60,7 +61,22 @@ public enum BatchShuffleMode implements DescribedEnum {
     ALL_EXCHANGES_BLOCKING(
             text(
                     "Upstream and downstream tasks run subsequently. This reduces the resource usage "
-                            + "as downstream tasks are started after upstream tasks finished."));
+                            + "as downstream tasks are started after upstream tasks finished.")),
+
+    /**
+     * After the upstream task run, when the downstream task run is not constrained by the data
+     * output.
+     *
+     * <p>This will make full use of existing resources.
+     */
+    // TODO remove the annotation and rename this enum constant when hybrid shuffle effort is
+    // finished.
+    @Documentation.ExcludeFromDocumentation
+    EXPERIMENTAL_ALL_EXCHANGES_HYBRID(

Review Comment:
   I'd use `WIP_ALL_EXCHANGE_HYBRID` to indicate that the feature is not completed. It should also be explained in JavaDoc and description.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   I'd suggest to not verify the complete error message.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1161,7 +1161,7 @@ && getHeadOperator(upStreamVertex, streamGraph).isLegacySource()) {
                 break;
             default:
                 throw new RuntimeException(
-                        "Unknown chaining strategy: " + upStreamOperator.getChainingStrategy());
+                        "Unknown chaining strategy: " + downStreamOperator.getChainingStrategy());

Review Comment:
   Hotfix commits should always go before main changes in a PR.



##########
flink-core/src/main/java/org/apache/flink/api/common/BatchShuffleMode.java:
##########
@@ -60,7 +61,22 @@ public enum BatchShuffleMode implements DescribedEnum {
     ALL_EXCHANGES_BLOCKING(
             text(
                     "Upstream and downstream tasks run subsequently. This reduces the resource usage "
-                            + "as downstream tasks are started after upstream tasks finished."));
+                            + "as downstream tasks are started after upstream tasks finished.")),
+
+    /**
+     * After the upstream task run, when the downstream task run is not constrained by the data
+     * output.
+     *
+     * <p>This will make full use of existing resources.
+     */

Review Comment:
   The description needs to be improved. I'd suggest the following:
   ```
   Downstream can start running anytime, as long as the upstream has started.
   
   This adapts the resource usage to whatever is available.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org