You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/19 09:44:56 UTC

[flink] branch master updated: [FLINK-29034] HYBRID_FULL result partition type is not yet reConsumable

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f0a6c0cbd83 [FLINK-29034] HYBRID_FULL result partition type is not yet reConsumable
f0a6c0cbd83 is described below

commit f0a6c0cbd8313de8146c9c2610bb3db98bacaea0
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Aug 18 18:28:05 2022 +0800

    [FLINK-29034] HYBRID_FULL result partition type is not yet reConsumable
    
    This closes #20624
---
 .../failover/flip1/RestartPipelinedRegionFailoverStrategy.java | 10 ++++++----
 .../runtime/io/network/partition/ResultPartitionType.java      | 10 ++++++----
 .../streaming/api/transformations/StreamExchangeMode.java      |  4 ++--
 3 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index 08ab5b2ba66..fba91f495e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -293,8 +293,8 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
                     && resultPartitionAvailabilityChecker.isAvailable(resultPartitionID)
                     // If the result partition is available in the partition tracker and does not
                     // fail, it will be available if it can be re-consumption, and it may also be
-                    // available for PIPELINED_APPROXIMATE type.
-                    && isResultPartitionIsReConsumableOrPipelinedApproximate(resultPartitionID);
+                    // available for PIPELINED_APPROXIMATE and HYBRID_FULL type.
+                    && isResultPartitionCanBeConsumedRepeatedly(resultPartitionID);
         }
 
         public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
@@ -306,12 +306,14 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
             failedPartitions.remove(resultPartitionID);
         }
 
-        private boolean isResultPartitionIsReConsumableOrPipelinedApproximate(
+        private boolean isResultPartitionCanBeConsumedRepeatedly(
                 IntermediateResultPartitionID resultPartitionID) {
             ResultPartitionType resultPartitionType =
                     resultPartitionTypeRetriever.apply(resultPartitionID);
             return resultPartitionType.isReconsumable()
-                    || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
+                    || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE
+                    // TODO support re-consumable for HYBRID_FULL resultPartitionType.
+                    || resultPartitionType == ResultPartitionType.HYBRID_FULL;
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 29489f654dd..1cbdcf5bc19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -91,14 +91,16 @@ public enum ResultPartitionType {
      *
      * <p>Hybrid partitions can be consumed any time, whether fully produced or not.
      *
-     * <p>HYBRID_FULL partitions is re-consumable, so double calculation can be avoided during
+     * <p>HYBRID_FULL partitions can be consumed repeatedly, but it does not support concurrent
+     * consumption. So re-consumable is false, but double calculation can be avoided during
      * failover.
      */
-    HYBRID_FULL(true, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
+    // TODO support re-consumable for HYBRID_FULL resultPartitionType.
+    HYBRID_FULL(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
 
     /**
-     * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it is not
-     * re-consumable.
+     * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it cannot be
+     * consumed repeatedly.
      */
     HYBRID_SELECTIVE(
             false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
index f7270c05b44..c78683031fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
@@ -39,14 +39,14 @@ public enum StreamExchangeMode {
     /**
      * The consumer can start consuming data anytime as long as the producer has started producing.
      *
-     * <p>This exchange mode is re-consumable.
+     * <p>This exchange mode can be consumed repeatedly.
      */
     HYBRID_FULL,
 
     /**
      * The consumer can start consuming data anytime as long as the producer has started producing.
      *
-     * <p>This exchange mode is not re-consumable.
+     * <p>This exchange mode can not be consumed repeatedly.
      */
     HYBRID_SELECTIVE,