You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 06:31:26 UTC

[GitHub] [flink] reswqa opened a new pull request, #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

reswqa opened a new pull request, #19927:
URL: https://github.com/apache/flink/pull/19927

   ## What is the purpose of the change
   
   *Introduce and support HYBRID resultPartitionType*
   
   
   ## Brief change log
   
     - *Introduce HYBRID resultPartitionType.*
     - *Make streamGraph and jobGraph support HYBRID type edge.*
     - *Test that the pipelinedRegionSchedulingStrategy can be adapted to hybrid type edges.*
   
   
   ## Verifying this change
   
   This change added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? Docs and JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895328670


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   Yes, the interfaces were intended to expose a characteristic rather than to be bounded to a specific implementation. The problem was on the caller side. Instead of relying on the intended characteristic, some callers were relying on assumptions such as "a result partition type that returns `true` for `isBlocking` must be `PIPELINED` or `PIPELINED_BOUNDED`".
   
   E.g., in `AdaptiveScheduler#assertPreconditions`, the intention here is to make sure only `PIPELINED` and `PIPELINED_BOUNDED` are involved. You may take a look at where `isBlockingOrBlockingPersistentResultPartition` and `isPipelinedOrPipelinedBoundedResultPartition` for more details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1152195872

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r894307742


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -984,6 +989,10 @@ private void connect(Integer headOfChain, StreamEdge edge) {
                         "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
         }
 
+        if (resultPartitionType == ResultPartitionType.HYBRID) {
+            hasHybridResultPartition = true;

Review Comment:
   Ok, fair enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1152012927

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d189543d90e447c43e705f8d5923c6a003a15e14",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d189543d90e447c43e705f8d5923c6a003a15e14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d189543d90e447c43e705f8d5923c6a003a15e14 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType
URL: https://github.com/apache/flink/pull/19927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1152862184

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1152198469

   @xintongsong This PR has been updated according to the comments, you can take a look when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r894353716


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   This should use assertj's assertThatThrownby()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1153421196

   I'll handle the fixup commit during merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895328670


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   Yes, the interfaces were intended to expose a characteristic rather than to be bounded to a specific implementation. The problem was on the caller side. Instead of relying on the intended characteristic, some callers were relying on assumptions such as "a result partition type that returns `true` for `isPipelined` must be `PIPELINED` or `PIPELINED_BOUNDED`".
   
   E.g., in `AdaptiveScheduler#assertPreconditions`, the intention here is to make sure only `PIPELINED` and `PIPELINED_BOUNDED` are involved. You may take a look at where `isBlockingOrBlockingPersistentResultPartition` and `isPipelinedOrPipelinedBoundedResultPartition` for more details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895273619


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   Is this type will be included in the `isBlockingOrBlockingPersistentResultPartition` or `isPipelinedOrPipelinedBoundedResultPartition` ? 
   
   I found that after last change, these two new methods functionality is not very clear, It works like hard code check the `Pipelined` and `Blocking` type, Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r894258550


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java:
##########
@@ -36,6 +36,12 @@ public enum StreamExchangeMode {
      */
     BATCH,
 
+    /**
+     * The producer and consumer can run at the same time, or consumer can start after the producer
+     * finished.

Review Comment:
   ```
   The consumer can start consuming data anytime as long as the producer has started producing.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,20 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible. The amount of
+     * data spilled to disk should be minimized.

Review Comment:
   I won't be so sure about this. There could be different spilling strategies.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -984,6 +989,10 @@ private void connect(Integer headOfChain, StreamEdge edge) {
                         "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
         }
 
+        if (resultPartitionType == ResultPartitionType.HYBRID) {
+            hasHybridResultPartition = true;

Review Comment:
   We can move this into the `case HYBRID:` branch.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java:
##########
@@ -356,6 +373,111 @@ public void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
         assertEquals(3, testingSchedulerOperation.getScheduledVertices().size());
     }
 
+    @Test
+    public void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 1);
+        final JobVertex v2 = createJobVertex("v2", 1);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(schedulingTopology);
+
+        // all regions will be scheduled
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
+        schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED);
+
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+    }
+
+    /** Inner blocking edge will not affect it's region be scheduled. */
+    @Test
+    public void testSchedulingRegionWithInnerBlockingEdge() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 1);
+        final JobVertex v2 = createJobVertex("v2", 1);
+        final JobVertex v3 = createJobVertex("v3", 1);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        v3.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        startScheduling(schedulingTopology);
+
+        assertEquals(1, testingSchedulerOperation.getScheduledVertices().size());
+        List<ExecutionVertexID> executionVertexIds =
+                testingSchedulerOperation.getScheduledVertices().get(0);
+        assertEquals(3, executionVertexIds.size());
+    }
+
+    /**
+     * If a region have blocking and non-blocking input edge at the same time, it will be scheduled
+     * after it's all blocking edge finished, non-blocking edge don't block scheduling.
+     */
+    @Test
+    public void testDownStreamRegionWillNotBeBlockedByNonBlockingEdge() throws Exception {
+        final JobVertex v1 = createJobVertex("v1", 2);
+        final JobVertex v2 = createJobVertex("v2", 2);
+        final JobVertex v3 = createJobVertex("v3", 1);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+        // note that if it v1 -> v3 changed to a pipelined edge, the downstream will not form an
+        // independent region, which will form a blocking edge in the region and will not affect the
+        // scheduling, already covered by testSchedulingRegionWithInnerBlockingEdge.
+        v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID);
+
+        final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
+
+        final PipelinedRegionSchedulingStrategy schedulingStrategy =
+                startScheduling(schedulingTopology);
+
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v21 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[0];
+        v21.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(v21.getID(), ExecutionState.FINISHED);
+        assertEquals(2, testingSchedulerOperation.getScheduledVertices().size());
+
+        final ExecutionVertex v22 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[1];
+        v22.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(v22.getID(), ExecutionState.FINISHED);
+        assertEquals(3, testingSchedulerOperation.getScheduledVertices().size());
+    }
+

Review Comment:
   How do these 2 cases relate to this PR?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   We can use the annotation `@Test(expected = IllegalStateException.class)`. In this way, we don't need the `try-catch` and `fail()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/BatchShuffleMode.java:
##########
@@ -60,7 +61,22 @@ public enum BatchShuffleMode implements DescribedEnum {
     ALL_EXCHANGES_BLOCKING(
             text(
                     "Upstream and downstream tasks run subsequently. This reduces the resource usage "
-                            + "as downstream tasks are started after upstream tasks finished."));
+                            + "as downstream tasks are started after upstream tasks finished.")),
+
+    /**
+     * After the upstream task run, when the downstream task run is not constrained by the data
+     * output.
+     *
+     * <p>This will make full use of existing resources.
+     */
+    // TODO remove the annotation and rename this enum constant when hybrid shuffle effort is
+    // finished.
+    @Documentation.ExcludeFromDocumentation
+    EXPERIMENTAL_ALL_EXCHANGES_HYBRID(

Review Comment:
   I'd use `WIP_ALL_EXCHANGE_HYBRID` to indicate that the feature is not completed. It should also be explained in JavaDoc and description.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   I'd suggest to not verify the complete error message.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1161,7 +1161,7 @@ && getHeadOperator(upStreamVertex, streamGraph).isLegacySource()) {
                 break;
             default:
                 throw new RuntimeException(
-                        "Unknown chaining strategy: " + upStreamOperator.getChainingStrategy());
+                        "Unknown chaining strategy: " + downStreamOperator.getChainingStrategy());

Review Comment:
   Hotfix commits should always go before main changes in a PR.



##########
flink-core/src/main/java/org/apache/flink/api/common/BatchShuffleMode.java:
##########
@@ -60,7 +61,22 @@ public enum BatchShuffleMode implements DescribedEnum {
     ALL_EXCHANGES_BLOCKING(
             text(
                     "Upstream and downstream tasks run subsequently. This reduces the resource usage "
-                            + "as downstream tasks are started after upstream tasks finished."));
+                            + "as downstream tasks are started after upstream tasks finished.")),
+
+    /**
+     * After the upstream task run, when the downstream task run is not constrained by the data
+     * output.
+     *
+     * <p>This will make full use of existing resources.
+     */

Review Comment:
   The description needs to be improved. I'd suggest the following:
   ```
   Downstream can start running anytime, as long as the upstream has started.
   
   This adapts the resource usage to whatever is available.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r894305350


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -984,6 +989,10 @@ private void connect(Integer headOfChain, StreamEdge edge) {
                         "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
         }
 
+        if (resultPartitionType == ResultPartitionType.HYBRID) {
+            hasHybridResultPartition = true;

Review Comment:
   If case UNDEFINED reached, it also have chance to set resultPartitionType to HYBRID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895290172


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   This will not be included in `isBlockingOrBlockingPersistentResultPartition` or `isPipelinedOrPipelinedBoundedResultPartition`.
   
   The two methods here are exactly, as you said, checking for the specific types of the interface implementations. This is obviously not a good design, because you should not assume working with a specific implementation of the interface. However, this was not newly introduced. FLINK-27902 only explicitly separates use cases that rely on specific implementations from those that properly rely on the interfaces. Fixing of them probably requires more careful redesign in the problematic use cases, and we do not want to block this feature on that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895309095


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   Thanks @xintongsong for your explanation. IMO, in the previous version, It mainly expose the `isBlocking` and `isPipelined` and `isReconnectable` which I think is a characteristic and not bound to a specific implementation of the type. Please correct me if I'm wrong.
   But I think we can redesign this part after this feature finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r895381447


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -81,7 +81,19 @@ public enum ResultPartitionType {
      * in that {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task
      * fails.
      */
-    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM);
+    PIPELINED_APPROXIMATE(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.UPSTREAM),
+
+    /**
+     * Hybrid partitions with a bounded (local) buffer pool to support downstream task to
+     * simultaneous reading and writing shuffle data.
+     *
+     * <p>Hybrid result has the following two characteristics:
+     *
+     * <p>Intermediate data can be consumed any time, whether fully produced or not.
+     *
+     * <p>Intermediate data can be consumed directly from memory as much as possible.
+     */
+    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);

Review Comment:
   Get it, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r896405257


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   > This should use assertj's assertThatThrownby()
   
   nice suggestion, i have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r896405257


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1201,6 +1243,33 @@ private void verifyFractions(
                 delta);
     }
 
+    @Test
+    public void testSetNonDefaultSlotSharingInHybridMode() {
+        Configuration configuration = new Configuration();
+        // set all edge to HYBRID result partition type.
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+        final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration);
+        // specify slot sharing group for map1
+        streamGraph.getStreamNodes().stream()
+                .filter(n -> "map1".equals(n.getOperatorName()))
+                .findFirst()
+                .get()
+                .setSlotSharingGroup("testSlotSharingGroup");
+
+        try {
+            StreamingJobGraphGenerator.createJobGraph(streamGraph);
+            fail("hybrid shuffle mode with non default slot sharing group should failed.");
+        } catch (IllegalStateException e) {
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    "hybrid shuffle mode currently does not support setting slot sharing group"));

Review Comment:
   > This should use assertj's assertThatThrownby()
   
   @zentol nice suggestion, i have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #19927:
URL: https://github.com/apache/flink/pull/19927#issuecomment-1153416290

   @xintongsong Thanks for your review and good doc changes, Is the fixup commit will be rebased by me or when you merge?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org