You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/10 14:03:01 UTC

[flink] 04/04: [FLINK-18084][docs] Document the Application Mode

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dfd5178ecd6e3a22cb4a4052635fdc94009af4c
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Jun 8 15:54:52 2020 +0200

    [FLINK-18084][docs] Document the Application Mode
    
    This closes #12549.
    
    (cherry picked from commit 7aa5f3310ed781477e84de7e3a20da089ee3b4b5)
---
 docs/ops/deployment/index.md                | 78 +++++++++++++++++++++++++++++
 docs/ops/deployment/index.zh.md             | 78 +++++++++++++++++++++++++++++
 docs/ops/deployment/native_kubernetes.md    |  2 +-
 docs/ops/deployment/native_kubernetes.zh.md |  2 +-
 docs/ops/deployment/yarn_setup.md           | 43 +++++++++++++++-
 docs/ops/deployment/yarn_setup.zh.md        | 39 ++++++++++++++-
 6 files changed, 238 insertions(+), 4 deletions(-)

diff --git a/docs/ops/deployment/index.md b/docs/ops/deployment/index.md
index 94ef1ce..9824541 100644
--- a/docs/ops/deployment/index.md
+++ b/docs/ops/deployment/index.md
@@ -29,6 +29,84 @@ When deciding how and where to run Flink, there's a wide range of options availa
 * This will be replaced by the TOC
 {:toc}
 
+## Deployment Modes
+
+Flink can execute applications in one of three ways:
+ - in Session Mode, 
+ - in a Per-Job Mode, or
+ - in Application Mode.
+
+ The above modes differ in:
+ - the cluster lifecycle and resource isolation guarantees
+ - whether the application's `main()` method is executed on the client or on the cluster.
+
+#### Session Mode
+
+*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any 
+submitted application. Applications executed in the same (session) cluster use, and consequently compete
+for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up
+a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a Task Manager,
+then all jobs running on that Task Manager will be affected by the failure. This, apart from a negative
+impact on the job that caused the failure, implies a potential massive recovery process with all the 
+restarting jobs accessing the filesystem concurrently and making it unavailable to other services. 
+Additionally, having a single cluster running multiple jobs implies more load for the Flink Master, who 
+is responsible for the book-keeping of all the jobs in the cluster.
+
+#### Per-Job Mode
+
+Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available cluster manager
+framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to 
+that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
+cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own 
+Task Managers. In addition, it spreads the load of book-keeping across multiple Flink Masters, as there is 
+one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many 
+production reasons.
+
+#### Application Mode
+    
+In all the above modes, the application's `main()` method is executed on the client side. This process 
+includes downloading the application's dependencies locally, executing the `main()` to extract a representation
+of the application that Flink's runtime can understand (i.e. the `JobGraph`) and ship the dependencies and
+the `JobGraph(s)` to the cluster. This makes the Client a heavy resource consumer as it may need substantial
+network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the
+`main()`. This problem can be more pronounced when the Client is shared across users.
+
+Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time,
+the `main()` method of the application is executed on the Flink Master. Creating a cluster per application can be 
+seen as creating a session cluster shared only among the jobs of a particular application, and torn down when
+the application finishes. With this architecture, the *Application Mode* provides the same resource isolation
+and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing 
+the `main()` on the Flink Master allows for saving the CPU cycles required, but also save the bandwidth required
+for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load of
+downloading the dependencies of the applications in the cluster, as there is one Flink Master per application.
+
+<div class="alert alert-info" markdown="span">
+  <strong>Note:</strong> In the Application Mode, the `main()` is executed on the cluster and not on the client, 
+  as in the other modes. This may have implications for your code as, for example, any paths you register in 
+  your environment using the `registerCachedFile()` must be accessible by the Flink Master of your application.
+</div>
+
+Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of
+multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
+to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the 
+execution of the "next"  job being postponed until "this" job finishes. Using `executeAsync()`, which is 
+non-blocking, will lead to the "next" job starting before "this" job finishes.
+
+<div class="alert alert-info" markdown="span">
+  <strong>Attention:</strong> The Application Mode allows for multi-`execute()` applications but 
+  High-Availability is not supported in these cases. High-Availability in Application Mode is only
+  supported for single-`execute()` applications.
+</div>
+
+#### Summary
+
+In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster
+and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster
+for every submitted job, but this comes with better isolation guarantees as the resources are not shared 
+across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the 
+*Application Mode* creates a session cluster per application and executes the application's `main()` 
+method on the cluster.
+
 ## Deployment Targets
 
 Apache Flink ships with first class support for a number of common deployment targets.
diff --git a/docs/ops/deployment/index.zh.md b/docs/ops/deployment/index.zh.md
index 4499abb..53b4558 100644
--- a/docs/ops/deployment/index.zh.md
+++ b/docs/ops/deployment/index.zh.md
@@ -29,6 +29,84 @@ When deciding how and where to run Flink, there's a wide range of options availa
 * This will be replaced by the TOC
 {:toc}
 
+## Deployment Modes
+
+Flink can execute applications in one of three ways:
+ - in Session Mode, 
+ - in a Per-Job Mode, or
+ - in Application Mode.
+
+ The above modes differ in:
+ - the cluster lifecycle and resource isolation guarantees
+ - whether the application's `main()` method is executed on the client or on the cluster.
+
+#### Session Mode
+
+*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any 
+submitted application. Applications executed in the same (session) cluster use, and consequently compete
+for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up
+a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a Task Manager,
+then all jobs running on that Task Manager will be affected by the failure. This, apart from a negative
+impact on the job that caused the failure, implies a potential massive recovery process with all the 
+restarting jobs accessing the filesystem concurrently and making it unavailable to other services. 
+Additionally, having a single cluster running multiple jobs implies more load for the Flink Master, who 
+is responsible for the book-keeping of all the jobs in the cluster.
+
+#### Per-Job Mode
+
+Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available cluster manager
+framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to 
+that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
+cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own 
+Task Managers. In addition, it spreads the load of book-keeping across multiple Flink Masters, as there is 
+one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many 
+production reasons.
+
+#### Application Mode
+    
+In all the above modes, the application's `main()` method is executed on the client side. This process 
+includes downloading the application's dependencies locally, executing the `main()` to extract a representation
+of the application that Flink's runtime can understand (i.e. the `JobGraph`) and ship the dependencies and
+the `JobGraph(s)` to the cluster. This makes the Client a heavy resource consumer as it may need substantial
+network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the
+`main()`. This problem can be more pronounced when the Client is shared across users.
+
+Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time,
+the `main()` method of the application is executed on the Flink Master. Creating a cluster per application can be 
+seen as creating a session cluster shared only among the jobs of a particular application, and torn down when
+the application finishes. With this architecture, the *Application Mode* provides the same resource isolation
+and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing 
+the `main()` on the Flink Master allows for saving the CPU cycles required, but also save the bandwidth required
+for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load of
+downloading the dependencies of the applications in the cluster, as there is one Flink Master per application.
+
+<div class="alert alert-info" markdown="span">
+  <strong>Note:</strong> In the Application Mode, the `main()` is executed on the cluster and not on the client, 
+  as in the other modes. This may have implications for your code as, for example, any paths you register in 
+  your environment using the `registerCachedFile()` must be accessible by the Flink Master of your application.
+</div>
+
+Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of
+multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
+to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the 
+execution of the "next"  job being postponed until "this" job finishes. Using `executeAsync()`, which is 
+non-blocking, will lead to the "next" job starting before "this" job finishes.
+
+<div class="alert alert-info" markdown="span">
+  <strong>Attention:</strong> The Application Mode allows for multi-`execute()` applications but 
+  High-Availability is not supported in these cases. High-Availability in Application Mode is only
+  supported for single-`execute()` applications.
+</div>
+
+#### Summary
+
+In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster
+and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster
+for every submitted job, but this comes with better isolation guarantees as the resources are not shared 
+across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the 
+*Application Mode* creates a session cluster per application and executes the application's `main()` 
+method on the cluster.
+
 ## Deployment Targets
 
 Apache Flink ships with first class support for a number of common deployment targets.
diff --git a/docs/ops/deployment/native_kubernetes.md b/docs/ops/deployment/native_kubernetes.md
index 10bafa3..31a12f4 100644
--- a/docs/ops/deployment/native_kubernetes.md
+++ b/docs/ops/deployment/native_kubernetes.md
@@ -97,7 +97,7 @@ $ ./bin/kubernetes-session.sh \
 Use the following command to submit a Flink Job to the Kubernetes cluster.
 
 {% highlight bash %}
-$ ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
+$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
 {% endhighlight %}
 
 ### Accessing Job Manager UI
diff --git a/docs/ops/deployment/native_kubernetes.zh.md b/docs/ops/deployment/native_kubernetes.zh.md
index e9e0506..8bbadff 100644
--- a/docs/ops/deployment/native_kubernetes.zh.md
+++ b/docs/ops/deployment/native_kubernetes.zh.md
@@ -97,7 +97,7 @@ $ ./bin/kubernetes-session.sh \
 Use the following command to submit a Flink Job to the Kubernetes cluster.
 
 {% highlight bash %}
-$ ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
+$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
 {% endhighlight %}
 
 ### Accessing Job Manager UI
diff --git a/docs/ops/deployment/yarn_setup.md b/docs/ops/deployment/yarn_setup.md
index 4dcd290..20f3f46 100644
--- a/docs/ops/deployment/yarn_setup.md
+++ b/docs/ops/deployment/yarn_setup.md
@@ -222,7 +222,6 @@ You can check the number of TaskManagers in the JobManager web interface. The ad
 
 If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
 
-
 ## Run a single Flink job on YARN
 
 The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. It is also possible to launch Flink within YARN only for executing a single job.
@@ -251,6 +250,48 @@ The user-jars position in the class path can be controlled by setting the parame
 - `FIRST`: Adds the jar to the beginning of the system class path.
 - `LAST`: Adds the jar to the end of the system class path.
 
+## Run an application in Application Mode
+
+To launch an application in [Application Mode]({% link ops/deployment/index.md %}#deployment-modes), you can type:
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar
+{% endhighlight %}
+
+<div class="alert alert-info" markdown="span">
+  <strong>Attention:</strong> Apart from the `-t`, all other configuration parameters, such as the path 
+  to the savepoint to be used to bootstrap the application's state, the application parallelism or the 
+  required job manager/task manager memory sizes, can be specified by their configuration option, 
+  prefixed by `-D`.
+</div>
+  
+As an example, the command to specify the memory sizes of the JM and the TM, looks like:
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application \
+-Djobmanager.memory.process.size=2048m \
+-Dtaskmanager.memory.process.size=4096m \
+./examples/batch/WordCount.jar
+
+{% endhighlight %}
+
+For a look at the available configuration options, you can have a look [here]({% link ops/config.md %}). To unlock
+the full potential of the application mode, consider using it with the `yarn.provided.lib.dirs` configuration option
+and pre-upload your application jar to a location accessible by all nodes in your cluster. In this case, the 
+command could look like: 
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application \
+-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
+hdfs://myhdfs/jars/my-application.jar
+{% endhighlight %}
+
+The above will allow the job submission to be extra lightweight as the needed Flink jars and the application jar
+are  going to be picked up by the specified remote locations rather than be shipped to the cluster by the 
+client.
+
+Stopping, cancelling or querying the status of a running application can be done in any of the existing ways. 
+
 ## Recovery behavior of Flink on YARN
 
 Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.
diff --git a/docs/ops/deployment/yarn_setup.zh.md b/docs/ops/deployment/yarn_setup.zh.md
index 7ff496b..1f74a1c 100644
--- a/docs/ops/deployment/yarn_setup.zh.md
+++ b/docs/ops/deployment/yarn_setup.zh.md
@@ -222,7 +222,6 @@ You can check the number of TaskManagers in the JobManager web interface. The ad
 
 If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
 
-
 ## Run a single Flink job on YARN
 
 The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. It is also possible to launch Flink within YARN only for executing a single job.
@@ -251,6 +250,44 @@ The user-jars position in the class path can be controlled by setting the parame
 - `FIRST`: Adds the jar to the beginning of the system class path.
 - `LAST`: Adds the jar to the end of the system class path.
 
+## Run an application in Application Mode
+
+To launch an application in [Application Mode]({% link ops/deployment/index.zh.md %}#deployment-modes), you can type:
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar
+{% endhighlight %}
+
+<div class="alert alert-info" markdown="span">
+  <strong>Attention:</strong> Apart from the `-t`, all other configuration parameters, such as the path 
+  to the savepoint to be used to bootstrap the application's state, the application parallelism or the 
+  required job manager/task manager memory sizes, can be specified by their configuration option, 
+  prefixed by `-D`.
+</div>
+  
+As an example, the command to specify the memory sizes of the JM and the TM, looks like:
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=4096m  ./examples/batch/WordCount.jar
+{% endhighlight %}
+
+For a look at the available configuration options, you can have a look [here]({% link ops/config.zh.md %}). To unlock
+the full potential of the application mode, consider using it with the `yarn.provided.lib.dirs` configuration option
+and pre-upload your application jar to a location accessible by all nodes in your cluster. In this case, the 
+command could look like: 
+
+{% highlight bash %}
+./bin/flink run-application -t yarn-application \
+-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
+hdfs://myhdfs/jars/my-application.jar
+{% endhighlight %}
+
+The above will allow the job submission to be extra lightweight as the needed Flink jars and the application jar
+are  going to be picked up by the specified remote locations rather than be shipped to the cluster by the 
+client.
+
+Stopping, cancelling or querying the status of a running application can be done in any of the existing ways. 
+
 ## Recovery behavior of Flink on YARN
 
 Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.