You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/05/24 16:30:59 UTC

[GitHub] flink pull request #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3982

    [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

    This PR is based on #3981.
    
    This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
    retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
    an exception is thrown, the Cli won't fail but retry it the next time.
    
    cc @rmetzger.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink hardenYarnSession

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3982.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3982
    
----
commit 72ce39a1752cc19669f003b70cc2708852a06ac5
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-05-24T15:59:51Z

    [FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files
    
    Before the YarnClusterClient decided when to delete the Yarn application files.
    This is problematic because the client does not know whether a Yarn application
    is being restarted or terminated. Due to this the files where always deleted. This
    prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
    Flink's HA capabilities.
    
    The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
    if it receives a StopCluster message. That way, we can be sure that the yarn files
    are deleted only iff the cluster is intended to be shut down.

commit 9227539f97e6dbc77c5367b8c555b4ba0b2ad06d
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-05-24T16:26:57Z

    [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions
    
    This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
    retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
    an exception is thrown, the Cli won't fail but retry it the next time.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3982


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3982#discussion_r118497398
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -413,14 +413,18 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read
     			while (true) {
     				// ------------------ check if there are updates by the cluster -----------
     
    -				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
    -				LOG.debug("Received status message: {}", status);
    +				try {
    +					GetClusterStatusResponse status = yarnCluster.getClusterStatus();
    +					LOG.debug("Received status message: {}", status);
     
    -				if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
    -					System.err.println("Number of connected TaskManagers changed to " +
    +					if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
    +						System.err.println("Number of connected TaskManagers changed to " +
     							status.numRegisteredTaskManagers() + ". " +
    -						"Slots available: " + status.totalNumberOfSlots());
    -					numTaskmanagers = status.numRegisteredTaskManagers();
    +							"Slots available: " + status.totalNumberOfSlots());
    +						numTaskmanagers = status.numRegisteredTaskManagers();
    +					}
    +				} catch (Exception e) {
    +					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
    --- End diff --
    
    "Skipping" might be a better term here, because we aren't actually retrying to get the cluster status, just ignoring it for this loop attempt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3982
  
    Verified that Flink YARN sessions + HA works nicely now.
    
    1. Started non-detached YARN session
    2. Submit job
    3. Kill JobManager
    4. YARN session correctly messages disassociation with first JobManager
    5. Failure of cluster status retrieval is correctly ignored, and session does not abort.
    6. New JobManager correctly registered, TMs correctly re-connect.
    7. Job remains intact.
    
    Logs are sane and nice (minus my comment on the "retrying" working).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3982#discussion_r118499283
  
    --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
    @@ -89,5 +92,37 @@ class YarnJobManager(
           flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
           TimeUnit.SECONDS)
     
    +  val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
    --- End diff --
    
    Just to be sure that my understanding is correct:
    this should always exist, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3982: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3982
  
    LGTM! I'll rebase and merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---