You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/05 01:58:13 UTC

[flink] 02/02: [FLINK-27909] Add document for hybrid shuffle mode.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 004d31ae208d206702d7772e0319450d7d978900
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Aug 2 14:36:21 2022 +0800

    [FLINK-27909] Add document for hybrid shuffle mode.
    
    This closes #20433
---
 .../{blocking_shuffle.md => batch_shuffle.md}      |  87 +++++++++++++++---
 .../{blocking_shuffle.md => batch_shuffle.md}      | 100 +++++++++++++++++----
 2 files changed, 161 insertions(+), 26 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/blocking_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md
similarity index 61%
rename from docs/content.zh/docs/ops/batch/blocking_shuffle.md
rename to docs/content.zh/docs/ops/batch/batch_shuffle.md
index 4348e6befe1..3db398a0ad3 100644
--- a/docs/content.zh/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -1,10 +1,12 @@
 ---
-title: "Blocking Shuffle"
+title: "Batch Shuffle"
 weight: 4
 type: docs
 aliases:
-- /zh/ops/batch/blocking_shuffle.html
-- /zh/ops/batch/blocking_shuffle
+- zh/docs/ops/batch/batch_shuffle.html
+- zh/docs/ops/batch/batch_shuffle
+- zh/docs/ops/batch/blocking_shuffle
+- zh/docs/ops/batch/blocking_shuffle.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -25,17 +27,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Blocking Shuffle
+# Batch Shuffle
 
 ## 总览
 
-Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Table / SQL]({{< ref "/docs/dev/table/overview" >}}) 都支持通过批处理执行模式处理有界输入。此模式是通过 blocking shuffle 进行网络传输。与流式应用使用管道 shuffle 阻塞交换的数据并存储,然后下游任务通过网络获取这些值的方式不同。这种交换减少了执行作业所需的资源,因为它不需要同时运行上游和下游任务。
+Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Table / SQL]({{< ref "/docs/dev/table/overview" >}}) 都支持通过批处理执行模式处理有界输入。 在批处理模式下,Flink 提供了两种网络交换模式: `Blocking Shuffle` 和 `Hybrid Shuffle`.
 
-总的来说,Flink 提供了两种不同类型的 blocking shuffles:`Hash shuffle` 和 `Sort shuffle`。
+- `Blocking Shuffle` 是批处理的默认数据交换模式。它会持久化所有的中间数据,只有当数据产出完全后才能被消费。
+- `Hybrid Shuffle` 是下一代的批处理数据交换模式. 他会更加智能地持久化数据, 并且允许在数据生产的同时进行消费. 该特性目前仍处于实验阶段并且存在一些已知的 [限制](#limitations).
+
+## Blocking Shuffle
 
-在下面章节会详细说明它们。
+与流式应用使用管道 shuffle 交换数据的方式不同,blocking 交换持久化数据到存储中,然后下游任务通过网络获取这些值。这种交换减少了执行作业所需的资源,因为它不需要同时运行上游和下游任务。
+
+总的来说,Flink 提供了两种不同类型的 blocking shuffles:`Hash shuffle` 和 `Sort shuffle`。
 
-## Hash Shuffle
+### Hash Shuffle
 
 对于 1.14 以及更低的版本,`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。
 
@@ -64,7 +71,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 1. 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
 2. 在机械硬盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题。
 
-## Sort Shuffle
+### Sort Shuffle
 
 `Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash Shuffle`,`Sort Shuffle` 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 `Sort Shuffle` 可以获得比 `Hash Shuffle` 更好的性能。另外,`Sort Shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 `Sort Shuffle` 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK- [...]
 
@@ -76,7 +83,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 目前 `Sort Shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
 {{< /hint >}}
 
-## 如何选择 Blocking Shuffle
+### 如何选择 Blocking Shuffle
 
 总的来说,
 
@@ -85,21 +92,65 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 
 要在 `Sort Shuffle` 和 `Hash Shuffle` 间切换,你需要配置这个参数:[taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)。这个参数根据消费者Task的并发选择当前Task使用`Hash Shuffle` 或 `Sort Shuffle`,如果并发小于配置值则使用 `Hash Shuffle`,否则使用 `Sort Shuffle`。对于 1.15 以下版本,它的默认值是 `Integer.MAX_VALUE`,这意味着 `Hash Shuffle` 是默认实现。从 1.15 起,它的默认值是 1,这意味着 `Sort Shuffle` 是默认实现。
 
+## Hybrid Shuffle
+
+{{< hint warning >}}
+This feature is still experimental and has some known [limitations](#limitations).
+{{< /hint >}}
+
+Hybrid shuffle is the next generation of batch data exchanges. It combines the advantages of blocking shuffle and pipelined shuffle (in streaming mode).
+- Like blocking shuffle, it does not require upstream and downstream tasks to run simultaneously, which allows executing a job with little resources.
+- Like pipelined shuffle, it does not require downstream tasks to be executed after upstream tasks finish, which reduces the overall execution time of the job when given sufficient resources.
+- It adapts to custom preferences between persisting less data and restarting less tasks on failures, by providing different spilling strategies.
+
+### Spilling Strategy
+
+Hybrid shuffle provides two spilling strategies:
+
+- **Selective Spilling Strategy** persists data only if they are not consumed by downstream tasks timely. This reduces the amount of data to persist, at the price that in case of failures upstream tasks need to be restarted to reproduce the complete intermediate results.
+- **Full Spilling Strategy** persists all data, no matter they are consumed by downstream tasks or not. In case of failures, the persisted complete intermediate result can be re-consumed, without having to restart upstream tasks.
+
+### Usage
+
+To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+
+### Limitations
+
+Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating.
+
+- **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur.
+- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
+
 ## 性能调优
 
 下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
 
+{{< tabs "Performance Tuning" >}}
+
+{{< tab "Blocking Shuffle" >}}
 1. 如果你使用机械硬盘作为存储设备,请总是使用 `Sort Shuffle`,因为这可以极大的提升稳定性和性能。从 1.15 开始,`Sort Shuffle` 已经成为默认实现,对于 1.14 以及更低版本,你需要通过将 [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism) 配置为 1 以手动开启 `Sort Shuffle`。
 2. 对于 `Sort Shuffle` 和 `Hash Shuffle` 两种实现,你都可以考虑开启 [数据压缩]({{< ref "docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled) 除非数据本身无法压缩。从 1.15 开启,数据压缩是默认开启的,对于 1.14 以及更低版本你需要手动开启。
 3. 当使用 `Sort Shuffle` 时,减少 [独占网络缓冲区]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 并增加 [流动网络缓冲区]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-floating-buffers-per-gate) 有利于性能提升。对于 1.14 以及更高版本,建议将 [taskmanager.network.memory.buffers-per-channel]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 设为 0 并且将 [taskmanager.network.memory.floating-buffers-per-gate]({{< ref "docs/deployment/config" >}}#tas [...]
 4. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将 [网络内存比例]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少 0.2。为了使调整生效,你可能需要同时调整 [网络内存大小下界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) 以及 [网络内存大小上界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个 [内存配置文档]({{< ref "docs/deployment/memory/mem_setup_tm" >}})。
 5. 增大数据写出内存。像上面提到的那样,对于大规模作业,如果有充足的空闲内存,建议增大 [数据写出内存]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers) 大小到至少 (2 * 并发数)。注意:在你增大这个配置后,为避免出现 "Insufficient number of network buffers" 错误,你可能还需要增大总的网络内存大小。
 6. 增大数据读取内存。像上面提到的那样,对于大规模作业,建议增大 [数据读取内存]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此你必须增加相同的内存大小到 [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
+{{< /tab >}}
+
+{{< tab "Hybrid Shuffle" >}}
+1. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将 [网络内存比例]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少 0.2。为了使调整生效,你可能需要同时调整 [网络内存大小下界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) 以及 [网络内存大小上界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个 [内存配置文档]({{< ref "docs/deployment/memory/mem_setup_tm" >}})。
+2. 增大数据写出内存。对于大规模作业, 建议增大总内存大小,用于数据写入的内存越大, 下游越有机会直接从内存读取数据. 你需要保证每个 `Result Partition` 至少能够分配到 `numSubpartition + 1` 个buffer, 否则可能会遇到 "Insufficient number of network buffers" 错误。
+3. 增大数据读取内存。对于大规模作业,建议增大 [数据读取内存]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此你必须增加相同的内存大小到 [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ## Trouble Shooting
 
 尽管十分罕见,下面列举了一些你可能会碰到的异常情况以及对应的处理策略:
 
+{{< tabs "Trouble Shooting" >}}
+{{< tab "Blocking Shuffle" >}}
+
 | 异常情况 | 处理策略 |
 | :--------- | :------------------ |
 | Insufficient number of network buffers | 这意味着网络内存大小不足以支撑作业运行,你需要增加总的网络内存大小。注意:从 1.15 开始,`Sort Shuffle` 已经成为默认实现,对于一些场景,`Sort Shuffle` 可能比 `Hash Shuffle` 需要更多的网络内存,因此当你的批作业升级到 1.15 以后可能会遇到这个网络内存不足的问题。这种情况下,你只需要增大总的网络内存大小即可。|
@@ -112,3 +163,19 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 | Out of memory error | 如果你使用的是 `Hash Shuffle`,请切换到 `Sort Shuffle`。如果你已经在使用 `Sort Shuffle` 并且遵循了上面章节的建议,你可以考虑增大相应的内存大小。对于堆上内存,你可以增大 [taskmanager.memory.task.heap.size]({{< ref "docs/deployment/config" >}}#ttaskmanager-memory-task-heap-size),对于直接内存,你可以增大 [taskmanager.memory.task.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-task-off-heap-size)。|
 | Container killed by external resource manger | 多种原因可能会导致容器被杀,比如,杀掉一个低优先级容器以释放资源启动高优先级容器,或者容器占用了过多的资源,比如内存、磁盘空间等。像上面章节所提到的那样,`Hash Shuffle` 可能会使用过多的内存而被 YARN 杀掉。所以,如果你使用的是 `Hash Shuffle`,请切换到 `Sort Shuffle`。如果你已经在使用 `Sort Shuffle`,你可能需要同时检查 Flink 日志以及资源管理框架的日志以找出容器被杀的根因,并且做出相应的修复。|
 
+{{< /tab >}}
+
+{{< tab "Hybrid Shuffle" >}}
+
+| 异常情况                                      | 处理策略                                                                                                                                                                                                                                                                                                        |
+|:------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Insufficient number of network buffers    | 这意味着网络内存量不足以支撑作业运行,你需要增加总的内存大小。                                                                                                                                                                                                                                                                                            |                                                                                                                                   [...]
+| Connection reset by peer                  | 这通常意味着网络不太稳定或者压力较大。其他一些原因,如SSL握手超时等也可能会导致这一问题。 增大 [网络连接 backlog]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-server-backlog) 可能会有所帮助。                                                                                                                                                                   |
+| Network connection timeout                | 这通常意味着网络不太稳定或者压力较大。增大 [网络连接超时时间]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-client-connectTimeoutSec) 或者开启 [网络连接重试]({{< ref "docs/deployment/config" >}}#taskmanager-network-retries) 可能会有所帮助。                                                                                                         |
+| Socket read/write timeout                 | 这通常意味着网络传输速度较慢或者压力较大。增大 [网络收发缓冲区]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-sendReceiveBufferSize) 大小可能会有所帮助。如果作业运行在 Kubernetes 环境,使用 [host network]({{< ref "docs/deployment/config" >}}#kubernetes-hostnetwork-enabled) 可能会有所帮助。                                                                    |
+| Read buffer request timeout               | 这意味着对数据读取缓冲区的激烈竞争。要解决这一问题,你可以增大 [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 和 [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size)。 |
+| No space left on device                   | 这通常意味着磁盘存储空间或者 inodes 被耗尽。你可以考虑扩展磁盘存储空间或者做一些数据清理。                                                                                                                                                                                                                                                                          |
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/docs/ops/batch/blocking_shuffle.md b/docs/content/docs/ops/batch/batch_shuffle.md
similarity index 54%
rename from docs/content/docs/ops/batch/blocking_shuffle.md
rename to docs/content/docs/ops/batch/batch_shuffle.md
index b3b4c9bf823..fc98d6ad3cc 100644
--- a/docs/content/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content/docs/ops/batch/batch_shuffle.md
@@ -1,10 +1,12 @@
 ---
-title: "Blocking Shuffle"
+title: "Batch Shuffle"
 weight: 4
 type: docs
 aliases:
-- /ops/batch/blocking_shuffle.html
-- /ops/batch/blocking_shuffle
+- /docs/ops/batch/batch_shuffle.html
+- /docs/ops/batch/batch_shuffle
+- /docs/ops/batch/blocking_shuffle
+- /docs/ops/batch/blocking_shuffle.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -25,25 +27,30 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Blocking Shuffle
+# Batch Shuffle
 
 ## Overview
 
-Flink supports a batch execution mode in both [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) and [Table / SQL]({{< ref "/docs/dev/table/overview" >}}) for jobs executing across bounded input. In this mode, network exchanges occur via a blocking shuffle. Unlike the pipeline shuffle used for streaming applications, blocking exchanges persists data to some storage. Downstream tasks then fetch these values via the network. Such an exchange reduces the resources required t [...]
+Flink supports a batch execution mode in both [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) and [Table / SQL]({{< ref "/docs/dev/table/overview" >}}) for jobs executing across bounded input. In batch execution mode, Flink offers two modes for network exchanges: `Blocking Shuffle` and `Hybrid Shuffle`.
 
-As a whole, Flink provides two different types of blocking shuffles; `Hash shuffle` and `Sort shuffle`.
+- `Blocking Shuffle` is the default data exchange mode for batch executions. It persists all intermediate data, and can be consumed only after fully produced.
+- `Hybrid Shuffle` is the next generation data exchange mode for batch executions. It persists data more smartly, and allows consuming while being produced. This feature is still experimental and has some known [limitations](#limitations).
 
-They will be detailed in the following sections.
+## Blocking Shuffle
 
-## Hash Shuffle
+Unlike the pipeline shuffle used for streaming applications, blocking exchanges persists data to some storage. Downstream tasks then fetch these values via the network. Such an exchange reduces the resources required to execute the job as it does not need the upstream and downstream tasks to run simultaneously.
+
+As a whole, Flink provides two different types of blocking shuffles: `Hash shuffle` and `Sort shuffle`.
+
+### Hash Shuffle
 
 The default blocking shuffle implementation for 1.14 and lower, `Hash Shuffle`, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager's, which read the files and transmit data via the network.
 
 `Hash Shuffle` provides different mechanisms for writing and reading files:
 
- - `file`: Writes files with the normal File IO, reads and transmits files with Netty `FileRegion`. `FileRegion` relies on `sendfile` system call to reduce the number of data copies and memory consumption.
- - `mmap`: Writes and reads files with `mmap` system call.
- - `auto`: Writes files with the normal File IO, for file reading, it falls back to normal `file` option on 32 bit machine and use `mmap` on 64 bit machine. This is to avoid file size limitation of java `mmap` implementation on 32 bit machine.
+- `file`: Writes files with the normal File IO, reads and transmits files with Netty `FileRegion`. `FileRegion` relies on `sendfile` system call to reduce the number of data copies and memory consumption.
+- `mmap`: Writes and reads files with `mmap` system call.
+- `auto`: Writes files with the normal File IO, for file reading, it falls back to normal `file` option on 32 bit machine and use `mmap` on 64 bit machine. This is to avoid file size limitation of java `mmap` implementation on 32 bit machine.
 
 The different mechanism could be chosen via [TaskManager configurations]({{< ref "docs/deployment/config#taskmanager-network-blocking-shuffle-type" >}}).
 
@@ -62,9 +69,9 @@ The memory usage of `mmap` is not accounted for by configured memory limits, but
 `Hash Shuffle` works well for small scale jobs with SSD, but it also have some disadvantages:
 
 1. If the job scale is large, it might create too many files, and it requires a large write buffer to write these files at the same time.
-2. On HDD, when multiple downstream tasks fetch their data simultaneously, it might incur the issue of random IO.  
+2. On HDD, when multiple downstream tasks fetch their data simultaneously, it might incur the issue of random IO.
 
-## Sort Shuffle 
+### Sort Shuffle
 
 `Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13 and it becomes the default blocking shuffle implementation in 1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Fu [...]
 
@@ -76,7 +83,7 @@ Here are some config options that might need adjustment when using sort blocking
 Currently `Sort Shuffle` only sort records by partition index instead of the records themselves, that is to say, the `sort` is only used as a data clustering algorithm.
 {{< /hint >}}
 
-## Choices of Blocking Shuffle
+### Choices of Blocking Shuffle
 
 As a summary,
 
@@ -85,21 +92,65 @@ As a summary,
 
 To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism). It controls which shuffle implementation to use based on the parallelism of downstream tasks, if the parallelism is lower than the configured value, `Hash Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions lower than 1.15, its default va [...]
 
+## Hybrid Shuffle
+
+{{< hint warning >}}
+This feature is still experimental and has some known [limitations](#limitations).
+{{< /hint >}}
+
+Hybrid shuffle is the next generation of batch data exchanges. It combines the advantages of blocking shuffle and pipelined shuffle (in streaming mode).
+- Like blocking shuffle, it does not require upstream and downstream tasks to run simultaneously, which allows executing a job with little resources.
+- Like pipelined shuffle, it does not require downstream tasks to be executed after upstream tasks finish, which reduces the overall execution time of the job when given sufficient resources.
+- It adapts to custom preferences between persisting less data and restarting less tasks on failures, by providing different spilling strategies.
+
+### Spilling Strategy
+
+Hybrid shuffle provides two spilling strategies:
+
+- **Selective Spilling Strategy** persists data only if they are not consumed by downstream tasks timely. This reduces the amount of data to persist, at the price that in case of failures upstream tasks need to be restarted to reproduce the complete intermediate results.
+- **Full Spilling Strategy** persists all data, no matter they are consumed by downstream tasks or not. In case of failures, the persisted complete intermediate result can be re-consumed, without having to restart upstream tasks.
+
+### Usage
+
+To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+
+### Limitations
+
+Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating.
+
+- **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur.
+- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
+
 ## Performance Tuning
 
-The following guidelins may help you to achieve better performance especially for large scale batch jobs:
+The following guidelines may help you to achieve better performance especially for large scale batch jobs:
 
+{{< tabs "Performance Tuning" >}}
+
+{{< tab "Blocking Shuffle" >}}
 1. Always use `Sort Shuffle` on HDD because `Sort Shuffle` can largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is already the default blocking shuffle implementation, for 1.14 and lower version, you need to enable it manually by setting [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism) to 1.
-2. For both blocking shuffle implementations, you may consider [enabling data compression]({{< ref "docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled) to improve the performance unless the data is hard to compress. Since 1.15, data compression is already enabled by default, for 1.14 and lower version, you need to enable it manually.
+2. For both blocking shuffle implementations, you may consider [enabling data compression]({{< ref "docs/deployment/config">}}#taskmanager-network-batch-shuffle-compression-enabled) to improve the performance unless the data is hard to compress. Since 1.15, data compression is already enabled by default, for 1.14 and lower version, you need to enable it manually.
 3. When `Sort Shuffle` is used, decreasing the number of [exclusive buffers per channel]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) and increasing the number of [floating buffers per gate]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14 and higher version, it is suggested to set [taskmanager.network.memory.buffers-per-channel]({{< ref "docs/deployment/config" >}}#taskmanager-network-me [...]
 4. Increase the total size of network memory. Currently, the default network memory size is pretty modest. For large scale jobs, it's suggested to increase the total [network memory fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better performance. At the same time, you may also need to adjust the [lower bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and [upper bound]({{< ref "docs/deployment/con [...]
 5. Increase the memory size for shuffle data write. As mentioned in the above section, for large scale jobs, it's suggested to increase the number of [write buffers per result partition]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism) if you have enough memory. Note that you may also need to increase the total size of network memory to avoid the "Insufficient number of network buffers" error after you increase this config value.
 6. Increase the memory size for shuffle data read. As mentioned in the above section, for large scale jobs, it's suggested to increase the size of the [shared read memory]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value (for example, 256M or 512M). Because this memory is cut from the framework off-heap memory, you must increase [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-mem [...]
+{{< /tab >}}
+
+{{< tab "Hybrid Shuffle" >}}
+1. Increase the total size of network memory. Currently, the default network memory size is pretty modest. For large scale jobs, it's suggested to increase the total [network memory fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better performance. At the same time, you may also need to adjust the [lower bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and [upper bound]({{< ref "docs/deployment/con [...]
+2. Increase the memory size for shuffle data write. For large scale jobs, it's suggested to increase the total size of network memory, the larger the memory that can be used in the shuffle write phase, the more opportunities downstream to read data directly from memory. You need to ensure that each `Result Partition` can be allocated to at least `numSubpartition + 1` buffers, otherwise the "Insufficient number of network buffers" will be encountered.
+3. Increase the memory size for shuffle data read. For large scale jobs, it's suggested to increase the size of the [shared read memory]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value (for example, 256M or 512M). Because this memory is cut from the framework off-heap memory, you must increase [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) by the [...]
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ## Trouble Shooting
 
 Here are some exceptions you may encounter (rarely) and the corresponding solutions that may help:
 
+{{< tabs "Trouble Shooting" >}}
+{{< tab "Blocking Shuffle" >}}
+
 | Exceptions | Potential Solutions |
 | :--------- | :------------------ |
 | Insufficient number of network buffers | This means the amount of network memory is not enough to run the target job and you need to increase the total network memory size. Note that since 1.15, `Sort Shuffle` has become the default blocking shuffle implementation and for some cases, it may need more network memory than before, which means there is a small possibility that your batch jobs may suffer from this issue after upgrading to 1.15. If this is the case, you just need to increase [...]
@@ -111,3 +162,20 @@ Here are some exceptions you may encounter (rarely) and the corresponding soluti
 | No space left on device | This usually means that the disk space or the inodes have been exhausted. Please consider extending the storage space or do some cleanup. |
 | Out of memory error | If you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If you are already using `Sort Shuffle` and following the above guidelines, please consider increasing the corresponding memory size. For heap memory, you can increase [taskmanager.memory.task.heap.size]({{< ref "docs/deployment/config" >}}#ttaskmanager-memory-task-heap-size) and for direct memory, you can increase [taskmanager.memory.task.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmana [...]
 | Container killed by external resource manger | There are several reasons which can lead to the killing of a container, for example, kill a low priority container to make room for high priority container or the container uses too many resources like memory and disk space. As mentioned in the above section, `Hash Shuffle` may use too much memory and gets killed by YARN. So if you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If `Sort Shuffle` is already used, you may need to [...]
+
+{{< /tab >}}
+
+{{< tab "Hybrid Shuffle" >}}
+
+| Exceptions                                  | Potential Solutions                                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
+|:--------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
+| Insufficient number of network buffers      | This means the amount of network memory is not enough to run the target job and you need to increase the total network memory size.                                                                                                                                                                                                                                                                                                                          [...]
+| Connection reset by peer                    | This usually means that the network is unstable or or under heavy burden. Other issues like SSL handshake timeout may also cause this problem. Increasing the [network backlog]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-server-backlog) may help.                                                                                                                                                                                     [...]
+| Network connection timeout                  | This usually means that the network is unstable or under heavy burden and increasing the [network connection timeout]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-client-connectTimeoutSec) or enable [connection retry]({{< ref "docs/deployment/config" >}}#taskmanager-network-retries) may help.                                                                                                                                      [...]
+| Socket read/write timeout                   | This may indicate that the network is slow or under heavy burden and increasing the [network send/receive buffer size]({{< ref "docs/deployment/config" >}}#taskmanager-network-netty-sendReceiveBufferSize) may help. If the job is running in Kubernetes environment, using [host network]({{< ref "docs/deployment/config" >}}#kubernetes-hostnetwork-enabled) may also help.                                                                             [...]
+| Read buffer request timeout                 | This means a fierce contention of the shuffle read memory. To solve the issue, you can increase [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) together with [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size).                                                       [...]
+| No space left on device                     | This usually means that the disk space or the inodes have been exhausted. Please consider extending the storage space or do some cleanup.                                                                                                                                                                                                                                                                                                                    [...]
+
+{{< /tab >}}
+
+{{< /tabs >}}