You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhoukang (JIRA)" <ji...@apache.org> on 2017/07/26 13:04:00 UTC

[jira] [Updated] (SPARK-21539) Job should not be aborted when dynamic allocation is enabled or spark.executor.instances larger then current allocated number by yarn

     [ https://issues.apache.org/jira/browse/SPARK-21539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhoukang updated SPARK-21539:
-----------------------------
    Description: 
For spark on yarn.
Right now, when TaskSet can not run on any node or host.Which means blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
However, if dynamic allocation is enabled, we should wait for yarn to allocate new nodemanager in order to execute job successfully.
How to reproduce?
1、Set up a yarn cluster with  5 nodes.And assign a node1 with much larger cpu core and memory,which can let yarn launch container on this node even it is blacklisted by TaskScheduler.
2、modify BlockManager#registerWithExternalShuffleServer
{code:java}
logInfo("Registering executor with local external shuffle service.")
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirs.map(_.toString),
      diskBlockManager.subDirsPerLocalDir,
      shuffleManager.getClass.getName)

    val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
    val SLEEP_TIME_SECS = 5

    for (i <- 1 to MAX_ATTEMPTS) {
      try {
        {color:red}if (shuffleId.host.equals("node1's address")) {
             throw new Exception
        }{color}
        // Synchronous and will throw an exception if we cannot connect.
        shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
          shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
        return
      } catch {
        case e: Exception if i < MAX_ATTEMPTS =>
          logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
          Thread.sleep(SLEEP_TIME_SECS * 1000)
        case NonFatal(e) =>
          throw new SparkException("Unable to register with external shuffle server due to : " +
            e.getMessage, e)
      }
    }
{code}
add logic in red.
3、set shuffle service enable as true and open shuffle service for yarn.
Then yarn will always launch executor on node1 but failed since shuffle service can not register success.
Then job will be aborted.

  was:
For spark on yarn.
Right now, when TaskSet can not run on any node or host.Which means blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
However, if dynamic allocation is enabled, we should wait for yarn to allocate new nodemanager in order to execute job successfully.And we should report this information to yarn in case of assign same node which blacklisted by TaskScheduler.
How to reproduce?
1、Set up a yarn cluster with  5 nodes.And assign a node1 with much larger cpu core and memory,which can let yarn launch container on this node even it is blacklisted by TaskScheduler.
2、modify BlockManager#registerWithExternalShuffleServer
{code:java}
logInfo("Registering executor with local external shuffle service.")
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirs.map(_.toString),
      diskBlockManager.subDirsPerLocalDir,
      shuffleManager.getClass.getName)

    val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
    val SLEEP_TIME_SECS = 5

    for (i <- 1 to MAX_ATTEMPTS) {
      try {
        {color:red}if (shuffleId.host.equals("node1's address")) {
             throw new Exception
        }{color}
        // Synchronous and will throw an exception if we cannot connect.
        shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
          shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
        return
      } catch {
        case e: Exception if i < MAX_ATTEMPTS =>
          logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
          Thread.sleep(SLEEP_TIME_SECS * 1000)
        case NonFatal(e) =>
          throw new SparkException("Unable to register with external shuffle server due to : " +
            e.getMessage, e)
      }
    }
{code}
add logic in red.
3、set shuffle service enable as true and open shuffle service for yarn.
Then yarn will always launch executor on node1 but failed since shuffle service can not register success.
Then job will be aborted.


> Job should not be aborted when dynamic allocation is enabled or spark.executor.instances larger then current allocated number by yarn
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21539
>                 URL: https://issues.apache.org/jira/browse/SPARK-21539
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.6.1, 2.1.0, 2.2.0
>            Reporter: zhoukang
>
> For spark on yarn.
> Right now, when TaskSet can not run on any node or host.Which means blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
> However, if dynamic allocation is enabled, we should wait for yarn to allocate new nodemanager in order to execute job successfully.
> How to reproduce?
> 1、Set up a yarn cluster with  5 nodes.And assign a node1 with much larger cpu core and memory,which can let yarn launch container on this node even it is blacklisted by TaskScheduler.
> 2、modify BlockManager#registerWithExternalShuffleServer
> {code:java}
> logInfo("Registering executor with local external shuffle service.")
>     val shuffleConfig = new ExecutorShuffleInfo(
>       diskBlockManager.localDirs.map(_.toString),
>       diskBlockManager.subDirsPerLocalDir,
>       shuffleManager.getClass.getName)
>     val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
>     val SLEEP_TIME_SECS = 5
>     for (i <- 1 to MAX_ATTEMPTS) {
>       try {
>         {color:red}if (shuffleId.host.equals("node1's address")) {
>              throw new Exception
>         }{color}
>         // Synchronous and will throw an exception if we cannot connect.
>         shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
>           shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
>         return
>       } catch {
>         case e: Exception if i < MAX_ATTEMPTS =>
>           logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
>             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
>           Thread.sleep(SLEEP_TIME_SECS * 1000)
>         case NonFatal(e) =>
>           throw new SparkException("Unable to register with external shuffle server due to : " +
>             e.getMessage, e)
>       }
>     }
> {code}
> add logic in red.
> 3、set shuffle service enable as true and open shuffle service for yarn.
> Then yarn will always launch executor on node1 but failed since shuffle service can not register success.
> Then job will be aborted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org