You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "David Poorman (JIRA)" <ji...@apache.org> on 2019/01/16 00:34:00 UTC

[jira] [Created] (SPARK-26628) ExecutorAllocationClient.requestTotalExecutors() does not provide enough information to guarantee executor locality

David Poorman created SPARK-26628:
-------------------------------------

             Summary: ExecutorAllocationClient.requestTotalExecutors() does not provide enough information to guarantee executor locality
                 Key: SPARK-26628
                 URL: https://issues.apache.org/jira/browse/SPARK-26628
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.0
            Reporter: David Poorman


Executors are not necessarily launched on the nodes they need to be launched on to ensure data locality. After spending many hours debugging Spark, I have come to the conclusion that requestTotalExecutors() does not provide the resource manager that is allocating executors with enough information to ensure executors launch on the needed hosts.  The signature of requestTotalExecutors() is:

requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String,Int])

My understanding is that hostToLocalTaskCount is the number of tasks that _*can*_ run on each host, and numExecutors is the total number of executors needed.  

Consider the following example.

InputSplit1 can run on one of these hosts: Host A, Host B
InputSplit2 can run on one of these hosts: Host C

This results in the following hostToLocalTaskCount map:

Host A: 1
Host B: 1
Host C: 1

With numExecutors = 2

Based on this information, there is no possible way a resource manager's allocator can know which of these three hosts it should schedule its two executors on.  If it picks A and C, the RecordReaders run without error.  If it picks A and B, then the RecordReader expecting Host C encounters an unrecoverable error.

It looks like the interface to tell resource managers what the tasks are needs to be changed.  Perhaps they can just be fed the raw task information.


My team is currently using YARN as our resource manager.  A less than ideal fix for us would be a change to Spark's YARN ApplicationMaster to have it go ahead and allocate containers per the hostToLocalTaskCount map, and ignore the total number of executors.  For the example above, it could allocate three containers, 1 per host.  However, this could potentially create a TON of extra executors.  That can be mitigated by having those executors time out and exit quickly by setting spark.dynamicAllocation.executorIdleTimeout to a small value, but on a busy cluster, spawning excess YARN containers is not desirable.

 

Background:

I'm moderately new to Spark, but I'm trying to create an RDD using a custom-written to Hadoop InputFormat that doesn't use HDFS using JavaSparkContext.newApiHadoopRDD().  My InputFormat requires data locality.  That is, the RecordReader created for an InputSplit must run on one of the machines returned by InputSplit.getLocations().  Running the RecordReader for an input split on a machine not listed there is not possible with our architecture.  We intend in the future to make this possible, but right now we are doing everything we can to ensure data locality.

From experimentation, I've found that Spark Tasks are correctly assigned to executors running on the node they need to run on if there is an executor running on that node, but getting an executor to run on the appropriate nodes is a problem.  From reading documentation and many other sources, it sounds like Spark does all it can to launch executors on the needed nodes, but in practice, it doesn't work as we expected.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org