You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/17 10:19:14 UTC

[flink] branch master updated: [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba4b1829558 [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.
ba4b1829558 is described below

commit ba4b182955867fedfa9891bf0bf430e92eeab41a
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Nov 10 17:03:40 2022 +0800

    [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.
    
    HYBRID_FULL result partition become re-consumable since FLINK-28889.
    
    This closes #21284
---
 .../failover/flip1/RestartPipelinedRegionFailoverStrategy.java | 10 ++++------
 .../runtime/io/network/partition/ResultPartitionType.java      | 10 ++++------
 .../streaming/api/transformations/StreamExchangeMode.java      |  4 ++--
 3 files changed, 10 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index fba91f495e1..08ab5b2ba66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -293,8 +293,8 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
                     && resultPartitionAvailabilityChecker.isAvailable(resultPartitionID)
                     // If the result partition is available in the partition tracker and does not
                     // fail, it will be available if it can be re-consumption, and it may also be
-                    // available for PIPELINED_APPROXIMATE and HYBRID_FULL type.
-                    && isResultPartitionCanBeConsumedRepeatedly(resultPartitionID);
+                    // available for PIPELINED_APPROXIMATE type.
+                    && isResultPartitionIsReConsumableOrPipelinedApproximate(resultPartitionID);
         }
 
         public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
@@ -306,14 +306,12 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
             failedPartitions.remove(resultPartitionID);
         }
 
-        private boolean isResultPartitionCanBeConsumedRepeatedly(
+        private boolean isResultPartitionIsReConsumableOrPipelinedApproximate(
                 IntermediateResultPartitionID resultPartitionID) {
             ResultPartitionType resultPartitionType =
                     resultPartitionTypeRetriever.apply(resultPartitionID);
             return resultPartitionType.isReconsumable()
-                    || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE
-                    // TODO support re-consumable for HYBRID_FULL resultPartitionType.
-                    || resultPartitionType == ResultPartitionType.HYBRID_FULL;
+                    || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 1cbdcf5bc19..29489f654dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -91,16 +91,14 @@ public enum ResultPartitionType {
      *
      * <p>Hybrid partitions can be consumed any time, whether fully produced or not.
      *
-     * <p>HYBRID_FULL partitions can be consumed repeatedly, but it does not support concurrent
-     * consumption. So re-consumable is false, but double calculation can be avoided during
+     * <p>HYBRID_FULL partitions is re-consumable, so double calculation can be avoided during
      * failover.
      */
-    // TODO support re-consumable for HYBRID_FULL resultPartitionType.
-    HYBRID_FULL(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
+    HYBRID_FULL(true, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
 
     /**
-     * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it cannot be
-     * consumed repeatedly.
+     * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it is not
+     * re-consumable.
      */
     HYBRID_SELECTIVE(
             false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
index c78683031fb..f7270c05b44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
@@ -39,14 +39,14 @@ public enum StreamExchangeMode {
     /**
      * The consumer can start consuming data anytime as long as the producer has started producing.
      *
-     * <p>This exchange mode can be consumed repeatedly.
+     * <p>This exchange mode is re-consumable.
      */
     HYBRID_FULL,
 
     /**
      * The consumer can start consuming data anytime as long as the producer has started producing.
      *
-     * <p>This exchange mode can not be consumed repeatedly.
+     * <p>This exchange mode is not re-consumable.
      */
     HYBRID_SELECTIVE,