You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/10 09:57:34 UTC

[flink] branch master updated (44eef6f -> a442eb6)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 44eef6f  [FLINK-9637] [docs] Add public user documentation for state TTL feature
     new 3f40783  [FLINK-10099][test] Improve YarnResourceManagerTest
     new c85af4a  [FLINK-9240] Avoid deprecated Akka methods
     new 04ba9a8  [hotfix][docs] Add missing space after end of sentence.
     new a442eb6  [FLINK-9795][mesos, docs] Update Mesos documentation

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../high_availability_zookeeper_configuration.html |   2 +-
 .../generated/job_manager_configuration.html       |   2 +-
 docs/_includes/generated/mesos_configuration.html  |   8 +-
 .../mesos_task_manager_configuration.html          |   8 +-
 docs/ops/deployment/mesos.md                       |  83 +-----
 .../apache/flink/client/program/ClusterClient.java |   5 +-
 .../apache/flink/client/program/ClientTest.java    |   7 +-
 .../configuration/HighAvailabilityOptions.java     |   5 +-
 .../flink/configuration/JobManagerOptions.java     |   2 +-
 .../flink/mesos/configuration/MesosOptions.java    |  31 +-
 .../MesosApplicationMasterRunner.java              |  25 +-
 .../MesosTaskManagerParameters.java                |  22 +-
 .../runtime/akka/DefaultQuarantineHandler.java     |   9 +-
 .../runtime/minicluster/StandaloneMiniCluster.java |  10 +-
 .../flink/runtime/taskmanager/MemoryLogger.java    |   2 +-
 .../flink/runtime/util/ProcessShutDownThread.java  |   3 +-
 .../flink/runtime/jobmanager/JobManager.scala      |  17 +-
 .../runtime/minicluster/FlinkMiniCluster.scala     |  18 +-
 .../flink/runtime/taskmanager/TaskManager.scala    |  22 +-
 .../flink/runtime/akka/QuarantineMonitorTest.java  |  15 +-
 .../JobManagerHAJobGraphRecoveryITCase.java        |   4 +-
 .../jobmanager/JobManagerProcessReapingTest.java   |   2 +-
 .../flink/runtime/jobmanager/JobManagerTest.java   |  19 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java    |   2 +-
 .../runtime/metrics/TaskManagerMetricsTest.java    |   8 +-
 .../metrics/dump/MetricQueryServiceTest.java       |   2 +-
 .../StackTraceSampleCoordinatorTest.java           |   2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |   6 +-
 .../TaskManagerComponentsStartupShutdownTest.java  |  10 +-
 .../TaskManagerProcessReapingTestBase.java         |   2 +-
 .../taskmanager/TaskManagerRegistrationTest.java   |   2 +-
 .../impl/AkkaJobManagerRetrieverTest.java          |   9 +-
 .../jobmanager/JobManagerConnectionTest.scala      |   4 +-
 .../runtime/testingUtils/TestingCluster.scala      |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java      |   6 +-
 .../minicluster/LocalFlinkMiniClusterITCase.java   |   7 +-
 .../flink/yarn/YarnApplicationMasterRunner.java    |  25 +-
 .../apache/flink/yarn/YarnResourceManagerTest.java | 329 ++++++++++++---------
 38 files changed, 411 insertions(+), 333 deletions(-)


[flink] 04/04: [FLINK-9795][mesos, docs] Update Mesos documentation

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a442eb6c0388558c6fb2e5e616cd1cd15038b95c
Author: gyao <ga...@data-artisans.com>
AuthorDate: Thu Aug 9 13:27:01 2018 +0200

    [FLINK-9795][mesos, docs] Update Mesos documentation
    
    [FLINK-9795][mesos, docs] Remove unnecessary remark about task reconciliation.
    
    The config key high-availability.zookeeper.path.mesos-workers already has a
    default value. Even without explicitly setting the key, the task reconciliation
    will work. Moreover, if there would not be a default key, the code would throw an NPE. So
    either way, the remark is only confusing the reader.
    
    [FLINK-9795][mesos, docs] Remove configuration keys from Mesos Setup page.
    
    - Remove the Mesos specific configuration keys from the Mesos Setup page because
    they duplicate what is already on the configuration page.
    - Add missing descriptions for some of the keys that are under the Mesos section of the configuration
    page.
    - Improve formatting of the descriptions.
    
    [FLINK-9795][mesos, docs] Document which config options are only used in legacy mode.
    
    [FLINK-9795][mesos, docs] Document that mesos.initial-tasks is only needed in legacy mode.
    
    [FLINK-9795][mesos, docs] Clarify necessity of Marathon in documentation.
    
    [FLINK-9795][mesos, docs] Rewrite "Flink's JobManager and Web Interface" section.
    
    [FLINK-9795][mesos, docs] Add missing period at the end of sentence.
    
    This closes #6533.
---
 .../high_availability_zookeeper_configuration.html |  2 +-
 docs/_includes/generated/mesos_configuration.html  |  8 +--
 .../mesos_task_manager_configuration.html          |  8 +--
 docs/ops/deployment/mesos.md                       | 83 ++++------------------
 .../configuration/HighAvailabilityOptions.java     |  5 +-
 .../flink/mesos/configuration/MesosOptions.java    | 31 +++++---
 .../MesosTaskManagerParameters.java                | 22 ++++--
 7 files changed, 68 insertions(+), 91 deletions(-)

diff --git a/docs/_includes/generated/high_availability_zookeeper_configuration.html b/docs/_includes/generated/high_availability_zookeeper_configuration.html
index a49d160..6577878 100644
--- a/docs/_includes/generated/high_availability_zookeeper_configuration.html
+++ b/docs/_includes/generated/high_availability_zookeeper_configuration.html
@@ -60,7 +60,7 @@
         <tr>
             <td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
             <td style="word-wrap: break-word;">"/mesos-workers"</td>
-            <td>ZooKeeper root path (ZNode) for Mesos workers.</td>
+            <td>The ZooKeeper root path for persisting the Mesos worker information.</td>
         </tr>
         <tr>
             <td><h5>high-availability.zookeeper.path.root</h5></td>
diff --git a/docs/_includes/generated/mesos_configuration.html b/docs/_includes/generated/mesos_configuration.html
index c514c86..54e92e5 100644
--- a/docs/_includes/generated/mesos_configuration.html
+++ b/docs/_includes/generated/mesos_configuration.html
@@ -15,17 +15,17 @@
         <tr>
             <td><h5>mesos.initial-tasks</h5></td>
             <td style="word-wrap: break-word;">0</td>
-            <td>The initial workers to bring up when the master starts</td>
+            <td>The initial workers to bring up when the master starts. This option is ignored unless Flink is in <a href="#legacy">legacy mode</a>.</td>
         </tr>
         <tr>
             <td><h5>mesos.master</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td>The Mesos master URL. The value should be in one of the following forms: "host:port", "zk://host1:port1,host2:port2,.../path", "zk://username:password@host1:port1,host2:port2,.../path" or "file:///path/to/file"</td>
+            <td>The Mesos master URL. The value should be in one of the following forms: <ul><li>host:port</li><li>zk://host1:port1,host2:port2,.../path</li><li>zk://username:password@host1:port1,host2:port2,.../path</li><li>file:///path/to/file</li></ul></td>
         </tr>
         <tr>
             <td><h5>mesos.maximum-failed-tasks</h5></td>
             <td style="word-wrap: break-word;">-1</td>
-            <td>The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature</td>
+            <td>The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature. This option is ignored unless Flink is in <a href="#legacy">legacy mode</a>.</td>
         </tr>
         <tr>
             <td><h5>mesos.resourcemanager.artifactserver.port</h5></td>
@@ -65,7 +65,7 @@
         <tr>
             <td><h5>mesos.resourcemanager.tasks.port-assignments</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td>Comma-separated list of configuration keys which represent a configurable port.All port keys will dynamically get a port assigned through Mesos.</td>
+            <td>Comma-separated list of configuration keys which represent a configurable port. All port keys will dynamically get a port assigned through Mesos.</td>
         </tr>
     </tbody>
 </table>
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html b/docs/_includes/generated/mesos_task_manager_configuration.html
index 0af844d..1e67f84 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -10,12 +10,12 @@
         <tr>
             <td><h5>mesos.constraints.hard.hostattribute</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td>Constraints for task placement on mesos.</td>
+            <td>Constraints for task placement on Mesos based on agent attributes. Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target mesos agents. Example: az:eu-west-1a,series:t2</td>
         </tr>
         <tr>
             <td><h5>mesos.resourcemanager.tasks.bootstrap-cmd</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td></td>
+            <td>A command which is executed before the TaskManager is started.</td>
         </tr>
         <tr>
             <td><h5>mesos.resourcemanager.tasks.container.docker.force-pull-image</h5></td>
@@ -50,12 +50,12 @@
         <tr>
             <td><h5>mesos.resourcemanager.tasks.gpus</h5></td>
             <td style="word-wrap: break-word;">0</td>
-            <td></td>
+            <td>GPUs to assign to the Mesos workers.</td>
         </tr>
         <tr>
             <td><h5>mesos.resourcemanager.tasks.hostname</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td></td>
+            <td>Optional value to define the TaskManager’s hostname. The pattern _TASK_ is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. _TASK_.flink-service.mesos) for name lookups.</td>
         </tr>
         <tr>
             <td><h5>mesos.resourcemanager.tasks.mem</h5></td>
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index aca6f23..1ff8afa 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -59,13 +59,11 @@ or configuration files. For instance, in non-containerized environments, the
 artifact server will provide the Flink binaries. What files will be served
 depends on the configuration overlay used.
 
-### Flink's JobManager and Web Interface
+### Flink's Dispatcher and Web Interface
 
-The Mesos scheduler currently resides with the JobManager but will be started
-independently of the JobManager in future versions (see
-[FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)). The
-proposed changes will also add a Dispatcher component which will be the central
-point for job submission and monitoring.
+The Dispatcher and the web interface provide a central point for monitoring,
+job submission, and other client interaction with the cluster
+(see [FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)).
 
 ### Startup script and configuration overlays
 
@@ -139,7 +137,7 @@ More information about the deployment scripts can be found [here](http://mesos.a
 
 ### Installing Marathon
 
-Optionally, you may also [install Marathon](https://mesosphere.github.io/marathon/docs/) which will be necessary to run Flink in high availability (HA) mode.
+Optionally, you may also [install Marathon](https://mesosphere.github.io/marathon/docs/) which enables you to run Flink in [high availability (HA) mode](#high-availability).
 
 ### Pre-installing Flink vs Docker/Mesos containers
 
@@ -171,8 +169,6 @@ which manage the Flink processes in a Mesos cluster:
    It is automatically launched by the Mesos worker node to bring up a new TaskManager.
 
 In order to run the `mesos-appmaster.sh` script you have to define `mesos.master` in the `flink-conf.yaml` or pass it via `-Dmesos.master=...` to the Java process.
-Additionally, you should define the number of task managers which are started by Mesos via `mesos.initial-tasks`.
-This value can also be defined in the `flink-conf.yaml` or passed as a Java property.
 
 When executing `mesos-appmaster.sh`, it will create a job manager on the machine where you executed the script.
 In contrast to that, the task managers will be run as Mesos tasks in the Mesos cluster.
@@ -188,19 +184,21 @@ For example:
         -Djobmanager.heap.mb=1024 \
         -Djobmanager.rpc.port=6123 \
         -Drest.port=8081 \
-        -Dmesos.initial-tasks=10 \
         -Dmesos.resourcemanager.tasks.mem=4096 \
         -Dtaskmanager.heap.mb=3500 \
         -Dtaskmanager.numberOfTaskSlots=2 \
         -Dparallelism.default=10
 
+<div class="alert alert-info">
+  <strong>Note:</strong> If Flink is in <a href="{{ site.baseurl }}/ops/config.html#legacy">legacy mode</a>,
+  you should additionally define the number of task managers that are started by Mesos via
+  <a href="{{ site.baseurl }}/ops/config.html#mesos-initial-tasks"><code>mesos.initial-tasks</code></a>.
+</div>
 
 ### High Availability
 
 You will need to run a service like Marathon or Apache Aurora which takes care of restarting the Flink master process in case of node or process failures.
-In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({{ site.baseurl }}/ops/jobmanager_high_availability.html)
-
-For the reconciliation of tasks to work correctly, please also set `high-availability.zookeeper.path.mesos-workers` to a valid Zookeeper path.
+In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs]({{ site.baseurl }}/ops/jobmanager_high_availability.html).
 
 #### Marathon
 
@@ -211,7 +209,7 @@ Here is an example configuration for Marathon:
 
     {
         "id": "flink",
-        "cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.initial-tasks=1 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
+        "cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
         "cpus": 1.0,
         "mem": 1024
     }
@@ -220,60 +218,7 @@ When running Flink with Marathon, the whole Flink cluster including the job mana
 
 ### Configuration parameters
 
-`mesos.initial-tasks`: The initial workers to bring up when the master starts (**DEFAULT**: The number of workers specified at cluster startup).
-
-`mesos.constraints.hard.hostattribute`: Constraints for task placement on Mesos based on agent attributes (**DEFAULT**: None).
-Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target
-mesos agents.  Example: `az:eu-west-1a,series:t2`
-
-`mesos.maximum-failed-tasks`: The maximum number of failed workers before the cluster fails (**DEFAULT**: Number of initial workers).
-May be set to -1 to disable this feature.
-
-`mesos.master`: The Mesos master URL. The value should be in one of the following forms:
-
-* `host:port`
-* `zk://host1:port1,host2:port2,.../path`
-* `zk://username:password@host1:port1,host2:port2,.../path`
-* `file:///path/to/file`
-
-`mesos.failover-timeout`: The failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down (**DEFAULT:** 600).
-
-`mesos.resourcemanager.artifactserver.port`:The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.
-
-`mesos.resourcemanager.framework.name`: Mesos framework name (**DEFAULT:** Flink)
-
-`mesos.resourcemanager.framework.role`: Mesos framework role definition (**DEFAULT:** *)
-
-`high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information.
-
-`mesos.resourcemanager.framework.principal`: Mesos framework principal (**NO DEFAULT**)
-
-`mesos.resourcemanager.framework.secret`: Mesos framework secret (**NO DEFAULT**)
-
-`mesos.resourcemanager.framework.user`: Mesos framework user (**DEFAULT:**"")
-
-`mesos.resourcemanager.artifactserver.ssl.enabled`: Enables SSL for the Flink artifact server (**DEFAULT**: true). Note that `security.ssl.enabled` also needs to be set to `true` encryption to enable encryption.
-
-`mesos.resourcemanager.tasks.mem`: Memory to assign to the Mesos workers in MB (**DEFAULT**: 1024)
-
-`mesos.resourcemanager.tasks.cpus`: CPUs to assign to the Mesos workers (**DEFAULT**: 0.0)
-
-`mesos.resourcemanager.tasks.gpus`: GPUs to assign to the Mesos workers (**DEFAULT**: 0.0)
-
-`mesos.resourcemanager.tasks.container.type`: Type of the containerization used: "mesos" or "docker" (DEFAULT: mesos);
-
-`mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
-
-`mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
-
-`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
-
-`mesos.resourcemanager.tasks.uris`: A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers. (**NO DEFAULT**)
-
-`mesos.resourcemanager.tasks.container.docker.force-pull-image`: Instruct the docker containerizer to forcefully pull the image rather than reuse a cached version. (**DEFAULT**: false)
-
-`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
-
-`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).
+For a list of Mesos specific configuration, refer to the [Mesos section]({{ site.baseurl }}/ops/config.html#mesos)
+of the configuration documentation.
 
 {% top %}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index c8b8ae9..787efff 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -157,7 +158,9 @@ public class HighAvailabilityOptions {
 			key("high-availability.zookeeper.path.mesos-workers")
 			.defaultValue("/mesos-workers")
 			.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
-			.withDescription("ZooKeeper root path (ZNode) for Mesos workers.");
+			.withDescription(Description.builder()
+				.text("The ZooKeeper root path for persisting the Mesos worker information.")
+				.build());
 
 	// ------------------------------------------------------------------------
 	//  ZooKeeper Client Settings
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 7046605..426a891 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -19,6 +19,9 @@
 package org.apache.flink.mesos.configuration;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.LinkElement;
+import org.apache.flink.configuration.description.TextElement;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -33,7 +36,10 @@ public class MesosOptions {
 	public static final ConfigOption<Integer> INITIAL_TASKS =
 		key("mesos.initial-tasks")
 			.defaultValue(0)
-			.withDescription("The initial workers to bring up when the master starts");
+			.withDescription(Description.builder()
+				.text("The initial workers to bring up when the master starts. ")
+				.text("This option is ignored unless Flink is in %s.", LinkElement.link("#legacy", "legacy mode"))
+				.build());
 
 	/**
 	 * The maximum number of failed Mesos tasks before entirely stopping
@@ -44,8 +50,10 @@ public class MesosOptions {
 	public static final ConfigOption<Integer> MAX_FAILED_TASKS =
 		key("mesos.maximum-failed-tasks")
 			.defaultValue(-1)
-			.withDescription("The maximum number of failed workers before the cluster fails. May be set to -1 to disable" +
-				" this feature");
+			.withDescription(Description.builder()
+				.text("The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature. ")
+				.text("This option is ignored unless Flink is in %s.", LinkElement.link("#legacy", "legacy mode"))
+				.build());
 
 	/**
 	 * The Mesos master URL.
@@ -63,9 +71,14 @@ public class MesosOptions {
 	public static final ConfigOption<String> MASTER_URL =
 		key("mesos.master")
 			.noDefaultValue()
-			.withDescription("The Mesos master URL. The value should be in one of the following forms:" +
-				" \"host:port\", \"zk://host1:port1,host2:port2,.../path\"," +
-				" \"zk://username:password@host1:port1,host2:port2,.../path\" or \"file:///path/to/file\"");
+			.withDescription(Description.builder()
+				.text("The Mesos master URL. The value should be in one of the following forms: ")
+				.list(
+					TextElement.text("host:port"),
+					TextElement.text("zk://host1:port1,host2:port2,.../path"),
+					TextElement.text("zk://username:password@host1:port1,host2:port2,.../path"),
+					TextElement.text("file:///path/to/file"))
+				.build());
 
 	/**
 	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
@@ -125,7 +138,9 @@ public class MesosOptions {
 	 */
 	public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
 		.defaultValue("")
-		.withDescription("Comma-separated list of configuration keys which represent a configurable port." +
-			"All port keys will dynamically get a port assigned through Mesos.");
+		.withDescription(Description.builder()
+			.text("Comma-separated list of configuration keys which represent a configurable port. " +
+				"All port keys will dynamically get a port assigned through Mesos.")
+			.build());
 
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index d915b36..0315629 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.description.Description;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
 
@@ -65,7 +66,8 @@ public class MesosTaskManagerParameters {
 
 	public static final ConfigOption<Integer> MESOS_RM_TASKS_GPUS =
 		key("mesos.resourcemanager.tasks.gpus")
-		.defaultValue(0);
+		.defaultValue(0)
+		.withDescription(Description.builder().text("GPUs to assign to the Mesos workers.").build());
 
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
 		key("mesos.resourcemanager.tasks.container.type")
@@ -79,7 +81,12 @@ public class MesosTaskManagerParameters {
 
 	public static final ConfigOption<String> MESOS_TM_HOSTNAME =
 		key("mesos.resourcemanager.tasks.hostname")
-		.noDefaultValue();
+		.noDefaultValue()
+		.withDescription(Description.builder()
+			.text("Optional value to define the TaskManager’s hostname. " +
+				"The pattern _TASK_ is replaced by the actual id of the Mesos task. " +
+				"This can be used to configure the TaskManager to use Mesos DNS (e.g. _TASK_.flink-service.mesos) for name lookups.")
+			.build());
 
 	public static final ConfigOption<String> MESOS_TM_CMD =
 		key("mesos.resourcemanager.tasks.taskmanager-cmd")
@@ -87,7 +94,10 @@ public class MesosTaskManagerParameters {
 
 	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
 		key("mesos.resourcemanager.tasks.bootstrap-cmd")
-		.noDefaultValue();
+		.noDefaultValue()
+		.withDescription(Description.builder()
+			.text("A command which is executed before the TaskManager is started.")
+			.build());
 
 	public static final ConfigOption<String> MESOS_TM_URIS =
 		key("mesos.resourcemanager.tasks.uris")
@@ -116,7 +126,11 @@ public class MesosTaskManagerParameters {
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
 		.noDefaultValue()
-		.withDescription("Constraints for task placement on mesos.");
+		.withDescription(Description.builder()
+			.text("Constraints for task placement on Mesos based on agent attributes. " +
+				"Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target mesos agents. " +
+				"Example: az:eu-west-1a,series:t2")
+			.build());
 
 	/**
 	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.


[flink] 03/04: [hotfix][docs] Add missing space after end of sentence.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04ba9a85920bcab6c7b6c001a58c7570e987aabb
Author: gyao <ga...@data-artisans.com>
AuthorDate: Thu Aug 9 13:34:46 2018 +0200

    [hotfix][docs] Add missing space after end of sentence.
---
 docs/_includes/generated/job_manager_configuration.html                 | 2 +-
 .../src/main/java/org/apache/flink/configuration/JobManagerOptions.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index edab77e..0458af2 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -20,7 +20,7 @@
         <tr>
             <td><h5>jobmanager.execution.failover-strategy</h5></td>
             <td style="word-wrap: break-word;">"full"</td>
-            <td>This option specifies how the job computation recovers from task failures.Accepted values are:<ul><li>'full': Restarts all tasks.</li><li>'individual': Restarts only the failed task. Should only be used if all tasks are independent components.</li><li>'region': Restarts all tasks that could be affected by the task failure.</li></ul></td>
+            <td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks.</li><li>'individual': Restarts only the failed task. Should only be used if all tasks are independent components.</li><li>'region': Restarts all tasks that could be affected by the task failure.</li></ul></td>
         </tr>
         <tr>
             <td><h5>jobmanager.heap.size</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index c1a23bf..1666f21 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -110,7 +110,7 @@ public class JobManagerOptions {
 		key("jobmanager.execution.failover-strategy")
 			.defaultValue("full")
 			.withDescription(Description.builder()
-				.text("This option specifies how the job computation recovers from task failures." +
+				.text("This option specifies how the job computation recovers from task failures. " +
 					"Accepted values are:")
 				.list(
 					text("'full': Restarts all tasks."),


[flink] 02/04: [FLINK-9240] Avoid deprecated Akka methods

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c85af4aa1afb91e7aeac1d29d3c69b3f575e4eff
Author: Jeffrey Chung <ch...@users.noreply.github.com>
AuthorDate: Sat Jul 28 14:54:01 2018 -0400

    [FLINK-9240] Avoid deprecated Akka methods
    
    Use static imports
    
    Use the org.apache.flink.runtime.concurrent.Executors import
    
    This closes #6446.
---
 .../apache/flink/client/program/ClusterClient.java |  5 +++--
 .../apache/flink/client/program/ClientTest.java    |  7 ++++--
 .../MesosApplicationMasterRunner.java              | 25 ++++++++++++++++------
 .../runtime/akka/DefaultQuarantineHandler.java     |  9 ++++++--
 .../runtime/minicluster/StandaloneMiniCluster.java | 10 +++++++--
 .../flink/runtime/taskmanager/MemoryLogger.java    |  2 +-
 .../flink/runtime/util/ProcessShutDownThread.java  |  3 ++-
 .../flink/runtime/jobmanager/JobManager.scala      | 17 +++++++++------
 .../runtime/minicluster/FlinkMiniCluster.scala     | 18 ++++++++++------
 .../flink/runtime/taskmanager/TaskManager.scala    | 22 +++++++++----------
 .../flink/runtime/akka/QuarantineMonitorTest.java  | 15 +++++++------
 .../JobManagerHAJobGraphRecoveryITCase.java        |  4 ++--
 .../jobmanager/JobManagerProcessReapingTest.java   |  2 +-
 .../flink/runtime/jobmanager/JobManagerTest.java   | 19 ++++++++--------
 .../flink/runtime/jobmanager/JobSubmitTest.java    |  2 +-
 .../runtime/metrics/TaskManagerMetricsTest.java    |  8 ++++---
 .../metrics/dump/MetricQueryServiceTest.java       |  2 +-
 .../StackTraceSampleCoordinatorTest.java           |  2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  6 ++++--
 .../TaskManagerComponentsStartupShutdownTest.java  | 10 +++++----
 .../TaskManagerProcessReapingTestBase.java         |  2 +-
 .../taskmanager/TaskManagerRegistrationTest.java   |  2 +-
 .../impl/AkkaJobManagerRetrieverTest.java          |  9 +++++---
 .../jobmanager/JobManagerConnectionTest.scala      |  4 ++--
 .../runtime/testingUtils/TestingCluster.scala      |  9 ++++----
 .../JobManagerHACheckpointRecoveryITCase.java      |  6 ++++--
 .../minicluster/LocalFlinkMiniClusterITCase.java   |  7 ++++--
 .../flink/yarn/YarnApplicationMasterRunner.java    | 25 ++++++++++++++++------
 28 files changed, 160 insertions(+), 92 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index d85033a..51541be 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -249,8 +250,8 @@ public abstract class ClusterClient<T> {
 		@Override
 		public void close() throws Exception {
 			if (isLoaded()) {
-				actorSystem.shutdown();
-				actorSystem.awaitTermination();
+				actorSystem.terminate();
+				Await.ready(actorSystem.whenTerminated(), Duration.Inf());
 				actorSystem = null;
 			}
 		}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ca8b6fb..ec9cfc5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -67,6 +67,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
@@ -114,8 +117,8 @@ public class ClientTest extends TestLogger {
 	public void shutDownActorSystem() {
 		if (jobManagerSystem != null) {
 			try {
-				jobManagerSystem.shutdown();
-				jobManagerSystem.awaitTermination();
+				jobManagerSystem.terminate();
+				Await.ready(jobManagerSystem.whenTerminated(), Duration.Inf());
 			} catch (Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 630fa83..bbecf6a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -57,6 +57,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
@@ -73,10 +75,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -374,11 +380,14 @@ public class MesosApplicationMasterRunner {
 			}
 
 			if (actorSystem != null) {
-				try {
-					actorSystem.shutdown();
-				} catch (Throwable tt) {
-					LOG.error("Error shutting down actor system", tt);
-				}
+				actorSystem.terminate().onComplete(
+					new OnComplete<Terminated>() {
+						public void onComplete(Throwable failure, Terminated success) {
+							if (failure != null) {
+								LOG.error("Error shutting down actor system", failure);
+							}
+						}
+					}, directExecutionContext());
 			}
 
 			if (futureExecutor != null) {
@@ -412,7 +421,11 @@ public class MesosApplicationMasterRunner {
 		LOG.info("Mesos JobManager started");
 
 		// wait until everything is done
-		actorSystem.awaitTermination();
+		try {
+			Await.ready(actorSystem.whenTerminated(), Duration.Inf());
+		} catch (InterruptedException | TimeoutException e) {
+			LOG.error("Error shutting down actor system", e);
+		}
 
 		// if we get here, everything work out jolly all right, and we even exited smoothly
 		if (webMonitor != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 708437f..38c29bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -25,6 +25,9 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import org.slf4j.Logger;
 
+import java.util.concurrent.TimeoutException;
+
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -65,11 +68,13 @@ public class DefaultQuarantineHandler implements QuarantineHandler {
 
 	private void shutdownActorSystem(ActorSystem actorSystem) {
 		// shut the actor system down
-		actorSystem.shutdown();
+		actorSystem.terminate();
 
 		try {
 			// give it some time to complete the shutdown
-			actorSystem.awaitTermination(timeout);
+			Await.ready(actorSystem.whenTerminated(), timeout);
+		} catch (InterruptedException | TimeoutException e) {
+			log.error("Exception thrown when terminating the actor system", e);
 		} finally {
 			// now let's crash the JVM
 			System.exit(exitCode);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 0b0cbf5..808de22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -43,11 +43,13 @@ import akka.pattern.Patterns;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -154,8 +156,12 @@ public class StandaloneMiniCluster implements AutoCloseableAsync {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		actorSystem.shutdown();
-		actorSystem.awaitTermination();
+		actorSystem.terminate();
+		try {
+			Await.ready(actorSystem.whenTerminated(), Duration.Inf());
+		} catch (InterruptedException | TimeoutException e) {
+			exception = e;
+		}
 
 		try {
 			highAvailabilityServices.closeAndCleanupAllData();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 366e5fa..91849d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -125,7 +125,7 @@ public class MemoryLogger extends Thread {
 	@Override
 	public void run() {
 		try {
-			while (running && (monitored == null || !monitored.isTerminated())) {
+			while (running && (monitored == null || !monitored.whenTerminated().isCompleted())) {
 				logger.info(getMemoryUsageStatsAsString(memoryBean));
 				logger.info(getDirectMemoryStatsAsString(directBufferBean));
 				logger.info(getMemoryPoolStatsAsString(poolBeans));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
index e19cfd7..db0a1dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeoutException;
@@ -67,7 +68,7 @@ public class ProcessShutDownThread extends Thread {
 	@Override
 	public void run() {
 		try {
-			actorSystem.awaitTermination(terminationTimeout);
+			Await.ready(actorSystem.whenTerminated(), terminationTimeout);
 		} catch (Exception e) {
 			if (e instanceof TimeoutException) {
 				log.error("Actor system shut down timed out.", e);
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0855991..afecae2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager}
+import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
 import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
@@ -1867,7 +1868,10 @@ class JobManager(
       FiniteDuration(10, SECONDS)).start()
 
     // Shutdown and discard all queued messages
-    context.system.shutdown()
+    context.system.terminate().onComplete {
+      case scala.util.Success(_) =>
+      case scala.util.Failure(t) => log.warn("Could not cleanly shut down actor system", t)
+    }(directExecutionContext())
   }
 
   private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
@@ -2046,7 +2050,7 @@ object JobManager {
     }
 
     // block until everything is shut down
-    jobManagerSystem.awaitTermination()
+    Await.ready(jobManagerSystem.whenTerminated, Duration.Inf)
 
     webMonitorOption.foreach{
       webMonitor =>
@@ -2288,11 +2292,10 @@ object JobManager {
     catch {
       case t: Throwable =>
         LOG.error("Error while starting up JobManager", t)
-        try {
-          jobManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
-        }
+        jobManagerSystem.terminate().onComplete {
+          case scala.util.Success(_) =>
+          case scala.util.Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt)
+        }(directExecutionContext())
         throw t
     }
   }
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8d9e1ee..4f70a22 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
+import scala.util.{Failure, Success}
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -479,30 +480,30 @@ abstract class FlinkMiniCluster(
 
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
 
       resourceManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
     }
 
     jobManagerActorSystems foreach {
-      _ foreach(_.shutdown())
+      _ foreach(_.terminate())
     }
   }
 
   def awaitTermination(): Unit = {
     jobManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     resourceManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     taskManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
   }
 
@@ -625,7 +626,10 @@ abstract class FlinkMiniCluster(
 
   def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = {
     if(!useSingleActorSystem) {
-      actorSystem.shutdown()
+      actorSystem.terminate().onComplete {
+        case Success(_) =>
+        case Failure(t) => LOG.warn("Could not cleanly shut down the job client actor system.", t)
+      }
     }
   }
 
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1de4848..c04084c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -77,6 +77,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -423,7 +424,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly modify state
                 //            but only send messages to the TaskManager to do those changes
-                case scala.util.Success(result) =>
+                case Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -432,7 +433,7 @@ class TaskManager(
                     )
                   }
 
-                case scala.util.Failure(t) =>
+                case Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -839,10 +840,10 @@ class TaskManager(
             blobCache.get.getTransientBlobService.putTransient(fis)
           }(context.dispatcher)
             .onComplete {
-              case scala.util.Success(value) =>
+              case Success(value) =>
                 sender ! value
                 fis.close()
-              case scala.util.Failure(e) =>
+              case Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1534,7 +1535,7 @@ class TaskManager(
   }
 
   protected def shutdown(): Unit = {
-    context.system.shutdown()
+    context.system.terminate()
 
     // Await actor system termination and shut down JVM
     new ProcessShutDownThread(
@@ -1885,15 +1886,14 @@ object TaskManager {
       MemoryLogger.startIfConfigured(LOG.logger, configuration, taskManagerSystem)
 
       // block until everything is done
-      taskManagerSystem.awaitTermination()
+      Await.ready(taskManagerSystem.whenTerminated, Duration.Inf)
     } catch {
       case t: Throwable =>
         LOG.error("Error while starting up taskManager", t)
-        try {
-          taskManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
-        }
+        taskManagerSystem.terminate().onComplete {
+          case Success(_) =>
+          case Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt)
+        }(Executors.directExecutionContext())
         throw t
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 37a4547..a998fb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -43,7 +43,10 @@ import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -72,10 +75,10 @@ public class QuarantineMonitorTest extends TestLogger {
 	}
 
 	@AfterClass
-	public static void tearDown() {
+	public static void tearDown() throws InterruptedException, TimeoutException {
 		if (actorSystem1 != null) {
-			actorSystem1.shutdown();
-			actorSystem1.awaitTermination();
+			actorSystem1.terminate();
+			Await.ready(actorSystem1.whenTerminated(), Duration.Inf());
 		}
 	}
 
@@ -85,10 +88,10 @@ public class QuarantineMonitorTest extends TestLogger {
 	}
 
 	@After
-	public void tearDownTest() {
+	public void tearDownTest() throws InterruptedException, TimeoutException {
 		if (actorSystem2 != null) {
-			actorSystem2.shutdown();
-			actorSystem2.awaitTermination();
+			actorSystem2.terminate();
+			Await.ready(actorSystem2.whenTerminated(), Duration.Inf());
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index 0b7547d..ae8542b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -321,11 +321,11 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 			}
 
 			if (taskManagerSystem != null) {
-				taskManagerSystem.shutdown();
+				taskManagerSystem.terminate();
 			}
 
 			if (testSystem != null) {
-				testSystem.shutdown();
+				testSystem.terminate();
 			}
 
 			highAvailabilityServices.closeAndCleanupAllData();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 38b8431..fc16483 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -182,7 +182,7 @@ public class JobManagerProcessReapingTest extends TestLogger {
 				jmProcess.destroy();
 			}
 			if (localSystem != null) {
-				localSystem.shutdown();
+				localSystem.terminate();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 873a4f1..052349c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -132,6 +132,7 @@ import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
@@ -940,7 +941,7 @@ public class JobManagerTest extends TestLogger {
 			assertTrue(savepointFile.exists());
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 			}
 
 			if (archiver != null) {
@@ -956,7 +957,7 @@ public class JobManagerTest extends TestLogger {
 			}
 
 			if (actorSystem != null) {
-				actorSystem.awaitTermination(TESTING_TIMEOUT());
+				Await.result(actorSystem.whenTerminated(), TESTING_TIMEOUT());
 			}
 		}
 	}
@@ -1130,7 +1131,7 @@ public class JobManagerTest extends TestLogger {
 			}
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 			}
 
 			if (archiver != null) {
@@ -1243,7 +1244,7 @@ public class JobManagerTest extends TestLogger {
 			assertEquals(1, targetDirectory.listFiles().length);
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 			}
 
 			if (archiver != null) {
@@ -1259,7 +1260,7 @@ public class JobManagerTest extends TestLogger {
 			}
 
 			if (actorSystem != null) {
-				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+				Await.result(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT());
 			}
 		}
 	}
@@ -1416,7 +1417,7 @@ public class JobManagerTest extends TestLogger {
 			assertTrue("Unexpected response: " + response, response instanceof JobSubmitSuccess);
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 			}
 
 			if (archiver != null) {
@@ -1432,7 +1433,7 @@ public class JobManagerTest extends TestLogger {
 			}
 
 			if (actorSystem != null) {
-				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+				Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT());
 			}
 		}
 	}
@@ -1516,8 +1517,8 @@ public class JobManagerTest extends TestLogger {
 
 		} finally {
 			// cleanup the actor system and with it all of the started actors if not already terminated
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+			actorSystem.terminate();
+			Await.ready(actorSystem.whenTerminated(), Duration.Inf());
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c7d837b..ef493b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -117,7 +117,7 @@ public class JobSubmitTest {
 	@AfterClass
 	public static void teardownJobmanager() throws Exception {
 		if (jobManagerSystem != null) {
-			jobManagerSystem.shutdown();
+			jobManagerSystem.terminate();
 		}
 
 		if (highAvailabilityServices != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 4d18060..aa86100 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -49,6 +49,8 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -155,11 +157,11 @@ public class TaskManagerMetricsTest extends TestLogger {
 			Assert.assertFalse(metricRegistry.isShutdown());
 
 			// shut down the actors and the actor system
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+			actorSystem.terminate();
+			Await.result(actorSystem.whenTerminated(), Duration.Inf());
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 			}
 
 			if (highAvailabilityServices != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 1acaf61..3767421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -116,7 +116,7 @@ public class MetricQueryServiceTest extends TestLogger {
 		testActor.message = null;
 		assertEquals(0, emptyDump.serializedMetrics.length);
 
-		s.shutdown();
+		s.terminate();
 	}
 
 	private static class TestActor extends UntypedActor {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
index 786b0ae..ed98be5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -66,7 +66,7 @@ public class StackTraceSampleCoordinatorTest extends TestLogger {
 	@AfterClass
 	public static void tearDown() throws Exception {
 		if (system != null) {
-			system.shutdown();
+			system.terminate();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 07d0244..a32c1f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -49,6 +49,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import scala.concurrent.Await;
+
 public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -259,8 +261,8 @@ public class AkkaRpcActorTest extends TestLogger {
 
 			terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		} finally {
-			rpcActorSystem.shutdown();
-			rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+			rpcActorSystem.terminate();
+			Await.ready(rpcActorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout));
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 8289930..9669513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -62,6 +62,8 @@ import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertTrue;
@@ -205,8 +207,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 			jobManager.tell(Kill.getInstance(), ActorRef.noSender());
 
 			// shut down the actors and the actor system
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+			actorSystem.terminate();
+			Await.ready(actorSystem.whenTerminated(), Duration.Inf());
 			actorSystem = null;
 
 			// now that the TaskManager is shut down, the components should be shut down as well
@@ -215,9 +217,9 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 			assertTrue(memManager.isShutdown());
 		} finally {
 			if (actorSystem != null) {
-				actorSystem.shutdown();
+				actorSystem.terminate();
 
-				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+				Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT());
 			}
 
 			highAvailabilityServices.closeAndCleanupAllData();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index ae66a08..0b9b951 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -218,7 +218,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
 				taskManagerProcess.destroy();
 			}
 			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
+				jmActorSystem.terminate();
 			}
 			if (highAvailabilityServices != null) {
 				highAvailabilityServices.closeAndCleanupAllData();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index ad32a4f..7ee0921 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -102,7 +102,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	@AfterClass
 	public static void shutdownActorSystem() {
 		if (actorSystem != null) {
-			actorSystem.shutdown();
+			actorSystem.terminate();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 94473b9..5314550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -37,10 +37,13 @@ import org.junit.Test;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+import scala.concurrent.Await;
+
 /**
  * Test for the {@link AkkaJobManagerRetriever}.
  */
@@ -55,10 +58,10 @@ public class AkkaJobManagerRetrieverTest extends TestLogger {
 	}
 
 	@AfterClass
-	public static void teardown() {
+	public static void teardown() throws InterruptedException, TimeoutException {
 		if (actorSystem != null) {
-			actorSystem.shutdown();
-			actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+			actorSystem.terminate();
+			Await.ready(actorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout));
 
 			actorSystem = null;
 		}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 6d7d87c..947d029 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -77,7 +77,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
@@ -116,7 +116,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c2d47f9..f722935 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 
+import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
@@ -229,8 +230,8 @@ class TestingCluster(
           Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
           if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
+            jmActorSystems(index).terminate()
+            Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf)
           }
 
           val newJobManagerActorSystem = if(!singleActorSystem) {
@@ -274,8 +275,8 @@ class TestingCluster(
         Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
         if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
+          tmActorSystems(index).terminate()
+          Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf)
         }
 
         val taskManagerActorSystem  = if(!singleActorSystem) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index ce750d3..a22a8a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -94,7 +94,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
@@ -425,8 +427,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				miniCluster.awaitTermination();
 			}
 
-			system.shutdown();
-			system.awaitTermination();
+			system.terminate();
+			Await.ready(system.whenTerminated(), Duration.Inf());
 
 			testingServer.stop();
 			testingServer.close();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index aa2f38d..00c6865 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -38,8 +38,11 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.forkjoin.ForkJoinPool;
 import scala.concurrent.impl.ExecutionContextImpl;
 
@@ -62,7 +65,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
 	};
 
 	@Test
-	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws InterruptedException, TimeoutException {
 
 		final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
 		LocalFlinkMiniCluster miniCluster = null;
@@ -117,7 +120,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
 			}
 
 			JavaTestKit.shutdownActorSystem(system);
-			system.awaitTermination();
+			Await.ready(system.whenTerminated(), Duration.Inf());
 		}
 
 		// shut down the global execution context, to make sure it does not affect this testing
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 497ac87..e98e174 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -57,6 +57,8 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -71,11 +73,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.yarn.Utils.require;
 
 /**
@@ -401,11 +407,14 @@ public class YarnApplicationMasterRunner {
 			}
 
 			if (actorSystem != null) {
-				try {
-					actorSystem.shutdown();
-				} catch (Throwable tt) {
-					LOG.error("Error shutting down actor system", tt);
-				}
+				actorSystem.terminate().onComplete(
+					new OnComplete<Terminated>() {
+						public void onComplete(Throwable failure, Terminated result) {
+							if (failure != null) {
+								LOG.error("Error shutting down actor system", failure);
+							}
+						}
+					}, directExecutionContext());
 			}
 
 			futureExecutor.shutdownNow();
@@ -418,7 +427,11 @@ public class YarnApplicationMasterRunner {
 		LOG.info("YARN Application Master started");
 
 		// wait until everything is done
-		actorSystem.awaitTermination();
+		try {
+			Await.ready(actorSystem.whenTerminated(), Duration.Inf());
+		} catch (InterruptedException | TimeoutException e) {
+			LOG.error("Error shutting down actor system", e);
+		}
 
 		// if we get here, everything work out jolly all right, and we even exited smoothly
 		if (webMonitor != null) {


[flink] 01/04: [FLINK-10099][test] Improve YarnResourceManagerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f40783f48a6ccef9c609ac8204437e00033b76c
Author: 陈梓立 <wa...@gmail.com>
AuthorDate: Mon Aug 6 16:09:43 2018 +0800

    [FLINK-10099][test] Improve YarnResourceManagerTest
    
    Introduce methods to mock a Yarn Container and ContainerStatus.
    
    Properly shutdown a started ResourceManager.
    
    This closes #6499.
---
 .../apache/flink/yarn/YarnResourceManagerTest.java | 329 ++++++++++++---------
 1 file changed, 181 insertions(+), 148 deletions(-)

diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb8e968..a7d4f43 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private Configuration flinkConfig = new Configuration();
+	private Configuration flinkConfig;
 
-	private Map<String, String> env = new HashMap<>();
+	private Map<String, String> env;
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
 
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
 
 	@Before
 	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		flinkConfig = new Configuration();
 		flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+
 		File root = folder.getRoot();
 		File home = new File(root, "home");
 		boolean created = home.mkdir();
 		assertTrue(created);
 
+		env = new HashMap<>();
 		env.put(ENV_APP_ID, "foo");
 		env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
 		env.put(ENV_CLIENT_SHIP_FILES, "");
@@ -139,15 +148,21 @@ public class YarnResourceManagerTest extends TestLogger {
 	}
 
 	@After
-	public void teardown() {
-		env.clear();
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+
+		if (env != null) {
+			env.clear();
+		}
 	}
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
-		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		public NMClient mockNMClient;
+		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
+		NMClient mockNMClient;
 
-		public TestingYarnResourceManager(
+		TestingYarnResourceManager(
 				RpcService rpcService,
 				String resourceManagerEndpointId,
 				ResourceID resourceId,
@@ -181,11 +196,11 @@ public class YarnResourceManagerTest extends TestLogger {
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
 
-		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+		<T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
 			return callAsync(callable, TIMEOUT);
 		}
 
-		public MainThreadExecutor getMainThreadExecutorForTesting() {
+		MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
@@ -193,7 +208,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
 				YarnConfiguration yarnConfiguration,
 				int yarnHeartbeatIntervalMillis,
-				@Nullable String webInteraceUrl) {
+				@Nullable String webInterfaceUrl) {
 			return mockResourceManagerClient;
 		}
 
@@ -213,7 +228,6 @@ public class YarnResourceManagerTest extends TestLogger {
 
 		// services
 		final TestingRpcService rpcService;
-		final TestingFatalErrorHandler fatalErrorHandler;
 		final MockResourceManagerRuntimeServices rmServices;
 
 		// RM
@@ -240,7 +254,6 @@ public class YarnResourceManagerTest extends TestLogger {
 		 */
 		Context() throws Exception {
 			rpcService = new TestingRpcService();
-			fatalErrorHandler = new TestingFatalErrorHandler();
 			rmServices = new MockResourceManagerRuntimeServices();
 
 			// resource manager
@@ -258,7 +271,7 @@ public class YarnResourceManagerTest extends TestLogger {
 							rmServices.metricRegistry,
 							rmServices.jobLeaderIdService,
 							new ClusterInformation("localhost", 1234),
-							fatalErrorHandler,
+							testingFatalErrorHandler,
 							null,
 							mockResourceManagerClient,
 							mockNMClient);
@@ -269,15 +282,15 @@ public class YarnResourceManagerTest extends TestLogger {
 		 */
 		class MockResourceManagerRuntimeServices {
 
-			public final ScheduledExecutor scheduledExecutor;
-			public final TestingHighAvailabilityServices highAvailabilityServices;
-			public final HeartbeatServices heartbeatServices;
-			public final MetricRegistry metricRegistry;
-			public final TestingLeaderElectionService rmLeaderElectionService;
-			public final JobLeaderIdService jobLeaderIdService;
-			public final SlotManager slotManager;
+			private final ScheduledExecutor scheduledExecutor;
+			private final TestingHighAvailabilityServices highAvailabilityServices;
+			private final HeartbeatServices heartbeatServices;
+			private final MetricRegistry metricRegistry;
+			private final TestingLeaderElectionService rmLeaderElectionService;
+			private final JobLeaderIdService jobLeaderIdService;
+			private final SlotManager slotManager;
 
-			public UUID rmLeaderSessionId;
+			private UUID rmLeaderSessionId;
 
 			MockResourceManagerRuntimeServices() throws Exception {
 				scheduledExecutor = mock(ScheduledExecutor.class);
@@ -295,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger {
 						Time.minutes(5L));
 			}
 
-			public void grantLeadership() throws Exception {
+			void grantLeadership() throws Exception {
 				rmLeaderSessionId = UUID.randomUUID();
 				rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
@@ -304,7 +317,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		/**
 		 * Start the resource manager and grant leadership to it.
 		 */
-		public void startResourceManager() throws Exception {
+		void startResourceManager() throws Exception {
 			resourceManager.start();
 			rmServices.grantLeadership();
 		}
@@ -312,93 +325,129 @@ public class YarnResourceManagerTest extends TestLogger {
 		/**
 		 * Stop the Akka actor system.
 		 */
-		public void stopResourceManager() throws Exception {
+		void stopResourceManager() throws Exception {
 			rpcService.stopService().get();
 		}
-	}
 
-	@Test
-	public void testStopWorker() throws Exception {
-		new Context() {{
+		/**
+		 * A wrapper function for running test. Deal with setup and teardown logic
+		 * in Context.
+		 * @param testMethod the real test body.
+		 */
+		void runTest(RunnableWithException testMethod) throws Exception {
 			startResourceManager();
-			// Request slot from SlotManager.
-			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.registerSlotRequest(
-					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				return null;
-			});
+			try {
+				testMethod.run();
+			} finally {
+				stopResourceManager();
+			}
+		}
+	}
 
-			// wait for the registerSlotRequest completion
-			registerSlotRequestFuture.get();
-
-			// Callback from YARN when container is allocated.
-			Container testingContainer = mock(Container.class);
-			when(testingContainer.getId()).thenReturn(
-				ContainerId.newInstance(
-					ApplicationAttemptId.newInstance(
-						ApplicationId.newInstance(System.currentTimeMillis(), 1),
-						1),
-					1));
-			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
-			// Remote task executor registers with YarnResourceManager.
-			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
-			rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
-
-			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
-			final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
-			final SlotReport slotReport = new SlotReport(
-				new SlotStatus(
-					new SlotID(taskManagerResourceId, 1),
-					new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
-
-			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
-				.registerTaskExecutor(
-					taskHost,
-					taskManagerResourceId,
-					dataPort,
-					hardwareDescription,
-					Time.seconds(10L))
-				.thenCompose(
-					(RegistrationResponse response) -> {
-						assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
-						final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
-						return rmGateway.sendSlotReport(
-							taskManagerResourceId,
-							success.getRegistrationId(),
-							slotReport,
-							Time.seconds(10L));
-					})
-				.handleAsync(
-					(Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
-					resourceManager.getMainThreadExecutorForTesting());
-
-			final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
-
-			assertEquals(1, numberRegisteredSlots);
-
-			// Unregister all task executors and release all containers.
-			CompletableFuture<?> unregisterAndReleaseFuture =  resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
-				return null;
-			});
+	private static Container mockContainer(String host, int port, int containerId) {
+		Container mockContainer = mock(Container.class);
+
+		NodeId mockNodeId = NodeId.newInstance(host, port);
+		ContainerId mockContainerId = ContainerId.newInstance(
+			ApplicationAttemptId.newInstance(
+				ApplicationId.newInstance(System.currentTimeMillis(), 1),
+				1
+			),
+			containerId
+		);
+
+		when(mockContainer.getId()).thenReturn(mockContainerId);
+		when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+		when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+		when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+		return mockContainer;
+	}
+
+	private static ContainerStatus mockContainerStatus(ContainerId containerId) {
+		ContainerStatus mockContainerStatus = mock(ContainerStatus.class);
 
-			unregisterAndReleaseFuture.get();
+		when(mockContainerStatus.getContainerId()).thenReturn(containerId);
+		when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+		when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit");
+		when(mockContainerStatus.getExitStatus()).thenReturn(-1);
 
-			verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
-			verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+		return mockContainerStatus;
+	}
 
-			stopResourceManager();
+	@Test
+	public void testStopWorker() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				// Request slot from SlotManager.
+				CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.registerSlotRequest(
+						new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+					return null;
+				});
+
+				// wait for the registerSlotRequest completion
+				registerSlotRequestFuture.get();
+
+				// Callback from YARN when container is allocated.
+				Container testingContainer = mockContainer("container", 1234, 1);
+
+				resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+				verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+				verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+				// Remote task executor registers with YarnResourceManager.
+				TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
+				rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
+
+				final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+				final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+				final SlotReport slotReport = new SlotReport(
+					new SlotStatus(
+						new SlotID(taskManagerResourceId, 1),
+						new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+
+				CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+					.registerTaskExecutor(
+						taskHost,
+						taskManagerResourceId,
+						dataPort,
+						hardwareDescription,
+						Time.seconds(10L))
+					.thenCompose(
+						(RegistrationResponse response) -> {
+							assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
+							final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
+							return rmGateway.sendSlotReport(
+								taskManagerResourceId,
+								success.getRegistrationId(),
+								slotReport,
+								Time.seconds(10L));
+						})
+					.handleAsync(
+						(Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+						resourceManager.getMainThreadExecutorForTesting());
+
+				final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+				assertEquals(1, numberRegisteredSlots);
+
+				// Unregister all task executors and release all containers.
+				CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+					return null;
+				});
+
+				unregisterAndReleaseFuture.get();
+
+				verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+			});
 
 			// It's now safe to access the SlotManager state since the ResourceManager has been stopped.
-			assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
-			assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+			assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0));
+			assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0));
 		}};
 	}
 
@@ -411,65 +460,49 @@ public class YarnResourceManagerTest extends TestLogger {
 			final File applicationDir = folder.newFolder(".flink");
 			env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
 
-			startResourceManager();
-
-			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
-			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+			runTest(() -> {
+				resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+				assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+			});
 		}};
 	}
 
 	/**
 	 * Tests that YarnResourceManager will not request more containers than needs during
 	 * callback from Yarn when container is Completed.
-	 * @throws Exception
 	 */
 	@Test
 	public void testOnContainerCompleted() throws Exception {
 		new Context() {{
-			startResourceManager();
-			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.registerSlotRequest(
-					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				return null;
+			runTest(() -> {
+				CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.registerSlotRequest(
+						new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+					return null;
+				});
+
+				// wait for the registerSlotRequest completion
+				registerSlotRequestFuture.get();
+
+				// Callback from YARN when container is allocated.
+				Container testingContainer = mockContainer("container", 1234, 1);
+
+				resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+				verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+				verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+				// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
+				// containers, need to request new container.
+				ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId());
+
+				resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+				verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+				// Callback from YARN when container is Completed happened before global fail, pending request
+				// slot is already fulfilled by pending containers, no need to request new container.
+				resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+				verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 			});
-
-			// wait for the registerSlotRequest completion
-			registerSlotRequestFuture.get();
-
-			ContainerId testContainerId = ContainerId.newInstance(
-				ApplicationAttemptId.newInstance(
-					ApplicationId.newInstance(System.currentTimeMillis(), 1),
-					1),
-				1);
-
-			// Callback from YARN when container is allocated.
-			Container testingContainer = mock(Container.class);
-			when(testingContainer.getId()).thenReturn(testContainerId);
-			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
-			// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
-			// containers, need to request new container.
-			ContainerStatus testingContainerStatus = mock(ContainerStatus.class);
-			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-
-			// Callback from YARN when container is Completed happened before global fail, pending request
-			// slot is already fulfilled by pending containers, no need to request new container.
-			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 		}};
 	}
 }