You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/07 09:25:26 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #19885: [FLINK-27902][runtime] Refactor ResultPartitionType to decouple scheduling and partition release logic

xintongsong commented on code in PR #19885:
URL: https://github.com/apache/flink/pull/19885#discussion_r890886104


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {

Review Comment:
   I'd suggest to name this `ConsumingConstraint`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -177,6 +177,32 @@ public boolean isReleaseByScheduler() {
         return releaseType == ReleaseType.RELEASE_BY_SCHEDULER;
     }
 
+    /**
+     * {@link #isBlockingOrBlockingPersistentResultPartition()} is used to judge whether it is the
+     * specified {@link #BLOCKING} or {@link #BLOCKING_PERSISTENT} resultPartitionType.
+     *
+     * <p>this method suitable for judgment conditions related to the specific implementation of
+     * {@link ResultPartitionType}.
+     *
+     * <p>this method not related to data consumption and partition release. As for the logic
+     * related to partition release, use {@link #isReleaseByScheduler()} instead, and as consume
+     * type, use {@link #mustBePipelinedConsumed()} or {@link #canBePipelinedConsumed()} instead.
+     */
+    public boolean isBlockingOrBlockingPersistentResultPartition() {
+        return this == BLOCKING || this == BLOCKING_PERSISTENT;
+    }
+
+    /**
+     * {@link #isPipelinedOrPipelinedBoundedResultPartition()} is used to judge whether it is the
+     * specified {@link #PIPELINED} or {@link #PIPELINED_BOUNDED} resultPartitionType.
+     *
+     * <p>This method suitable for judgment conditions related to the specific implementation of
+     * {@link ResultPartitionType}.
+     *
+     * <p>This method not related to data consumption and partition release. As for the logic
+     * related to partition release, use {@link #isReleaseByScheduler()} instead, and as consume
+     * type, use {@link #mustBePipelinedConsumed()} or {@link #canBePipelinedConsumed()} instead.
+     */

Review Comment:
   Belong to wrong commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {
+        /** Upstream must be finished before downstream consume. */
+        BLOCKING,
+        /** Downstream can consume while upstream is running. */
+        CAN_BE_PIPELINED,
+        /** Downstream must consume while upstream is running. */
+        MUST_BE_PIPELINED
+    }
+
+    /** ReleaseType indicates who is responsible for releasing the result partition. */
+    private enum ReleaseType {
+        RELEASE_BY_UPSTREAM,
+        RELEASE_BY_SCHEDULER
+    }

Review Comment:
   We may also want to add a comment about unifying how result partition should be released as a future work. Let's create a JIRA ticket (if there isn't one yet) and refers to it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {
+        /** Upstream must be finished before downstream consume. */
+        BLOCKING,
+        /** Downstream can consume while upstream is running. */
+        CAN_BE_PIPELINED,
+        /** Downstream must consume while upstream is running. */
+        MUST_BE_PIPELINED
+    }
+
+    /** ReleaseType indicates who is responsible for releasing the result partition. */
+    private enum ReleaseType {
+        RELEASE_BY_UPSTREAM,
+        RELEASE_BY_SCHEDULER
+    }

Review Comment:
   I'd suggest the following:
   ```
   private enum ReleaseBy {
       UPSTREAM,
       SCHEDULER
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##########
@@ -70,10 +70,8 @@ public void startTrackingPartition(
         Preconditions.checkNotNull(producingTaskExecutorId);
         Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
 
-        // blocking and PIPELINED_APPROXIMATE partitions require explicit partition release calls
-        // reconnectable will be removed after FLINK-19895, see also {@link
-        // ResultPartitionType#isReconnectable}.
-        if (!resultPartitionDeploymentDescriptor.getPartitionType().isReconnectable()) {
+        // non-releaseByScheduler partitions don't require explicit partition release calls.
+        if (!resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler()) {

Review Comment:
   There's an assumption that `!isReleasedByScheduler` means released by upstream. This is not safe, because `ReleaseType` is an enum, which happens to have 2 values now and could have more in future. I'd suggest to introduce another method `isReleasedByUpstream()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org