You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/17 10:19:14 UTC
[flink] branch master updated: [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba4b1829558 [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.
ba4b1829558 is described below
commit ba4b182955867fedfa9891bf0bf430e92eeab41a
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Nov 10 17:03:40 2022 +0800
[FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable.
HYBRID_FULL result partition become re-consumable since FLINK-28889.
This closes #21284
---
.../failover/flip1/RestartPipelinedRegionFailoverStrategy.java | 10 ++++------
.../runtime/io/network/partition/ResultPartitionType.java | 10 ++++------
.../streaming/api/transformations/StreamExchangeMode.java | 4 ++--
3 files changed, 10 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index fba91f495e1..08ab5b2ba66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -293,8 +293,8 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
&& resultPartitionAvailabilityChecker.isAvailable(resultPartitionID)
// If the result partition is available in the partition tracker and does not
// fail, it will be available if it can be re-consumption, and it may also be
- // available for PIPELINED_APPROXIMATE and HYBRID_FULL type.
- && isResultPartitionCanBeConsumedRepeatedly(resultPartitionID);
+ // available for PIPELINED_APPROXIMATE type.
+ && isResultPartitionIsReConsumableOrPipelinedApproximate(resultPartitionID);
}
public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
@@ -306,14 +306,12 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
failedPartitions.remove(resultPartitionID);
}
- private boolean isResultPartitionCanBeConsumedRepeatedly(
+ private boolean isResultPartitionIsReConsumableOrPipelinedApproximate(
IntermediateResultPartitionID resultPartitionID) {
ResultPartitionType resultPartitionType =
resultPartitionTypeRetriever.apply(resultPartitionID);
return resultPartitionType.isReconsumable()
- || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE
- // TODO support re-consumable for HYBRID_FULL resultPartitionType.
- || resultPartitionType == ResultPartitionType.HYBRID_FULL;
+ || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 1cbdcf5bc19..29489f654dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -91,16 +91,14 @@ public enum ResultPartitionType {
*
* <p>Hybrid partitions can be consumed any time, whether fully produced or not.
*
- * <p>HYBRID_FULL partitions can be consumed repeatedly, but it does not support concurrent
- * consumption. So re-consumable is false, but double calculation can be avoided during
+ * <p>HYBRID_FULL partitions is re-consumable, so double calculation can be avoided during
* failover.
*/
- // TODO support re-consumable for HYBRID_FULL resultPartitionType.
- HYBRID_FULL(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
+ HYBRID_FULL(true, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER),
/**
- * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it cannot be
- * consumed repeatedly.
+ * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it is not
+ * re-consumable.
*/
HYBRID_SELECTIVE(
false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
index c78683031fb..f7270c05b44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
@@ -39,14 +39,14 @@ public enum StreamExchangeMode {
/**
* The consumer can start consuming data anytime as long as the producer has started producing.
*
- * <p>This exchange mode can be consumed repeatedly.
+ * <p>This exchange mode is re-consumable.
*/
HYBRID_FULL,
/**
* The consumer can start consuming data anytime as long as the producer has started producing.
*
- * <p>This exchange mode can not be consumed repeatedly.
+ * <p>This exchange mode is not re-consumable.
*/
HYBRID_SELECTIVE,