You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by Brad Childs <bd...@redhat.com> on 2014/04/03 19:56:52 UTC

Yarn / mapreduce scheduling

Sorry if this is the wrong list, i am looking for deep technical/hadoop source help :) 

How does job scheduling work on yarn framework for map reduce jobs?  I see the yarn scheduler discussed here: https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html  which leads me to believe tasks are scheduled based on node capacity and not data locality.  I've sifted through the fair scheduler and can't find anything about data location or locality.

Where does data locality play into the scheduling of map/reduce tasks on yarn?  Can someone point me to the hadoop 2.x source where the data block location is used to calculate node/container/task assignment (if thats still happening).



-bc


Re: Yarn / mapreduce scheduling

Posted by Chen He <ai...@gmail.com>.
Hi Sandy Ryza

I searched the MRAppMaster.java but did not find exactly where it saves the
locality information. Would you mind provide a more detail information
about which object is used for keeping those locality info?

Regards!

Chen

On Thu, Apr 3, 2014 at 1:30 PM, Sandy Ryza <sa...@cloudera.com> wrote:

> The MapReduce application master reads the split info from HDFS and then
> submits requests to the scheduler based on the locations there.
>
>
> On Thu, Apr 3, 2014 at 1:22 PM, Brad Childs <bd...@redhat.com> wrote:
>
> > Sandy/Shekhar Thank you very much for the helpful responses.
> >
> > One last question/clarification- the getFileBlockLocations(..) in the
> > FileSystem API is the only file-->node mapping that I'm aware of, and it
> > seems the only place its called is in the map/reduce client
> > (FileInputFormat, MultiFileSplit).
> >
> > Is this accurate:
> >
> > 1) Jobclient calls getFileBlockLocations(..) and serializes this info to
> a
> > file on the DFS.
> > 2) Scheduler reads this information for determining locality
> >
> > Or am I missing another call/method for determining node location for
> > block locality?  I haven't located the spot in source that reads the
> block
> > location file for scheduler is why i ask.
> >
> > -bc
> >
> >
> > ----- Original Message -----
> > > From: "Sandy Ryza" <sa...@cloudera.com>
> > > To: common-dev@hadoop.apache.org
> > > Sent: Thursday, April 3, 2014 2:38:42 PM
> > > Subject: Re: Yarn / mapreduce scheduling
> > >
> > > The equivalent code in the Fair Scheduler is in AppSchedulable.java,
> > > under assignContainer(FSSchedulerNode node, boolean reserved).
> > >
> > > YARN uses delay scheduling (
> > >
> >
> http://people.csail.mit.edu/matei/papers/2010/eurosys_delay_scheduling.pdf
> > )
> > > for achieving data-locality.
> > >
> > > -Sandy
> > >
> > >
> > > On Thu, Apr 3, 2014 at 11:16 AM, Shekhar Gupta <sh...@gmail.com>
> > wrote:
> > >
> > > > Hi Brad,
> > > >
> > > >     YARN scheduling does take care of data locality. In YARN, tasks
> > are not
> > > > assigned based on capacity. Actually certain number of containers are
> > > > allocated on every node based on node's capacity. Tasks are executed
> on
> > > > those containers. While scheduling tasks on containers YARN scheduler
> > > > satisfies data locality requirements. I am not very familiar with
> Fair
> > > > Scheduler but if you check the source of FifoScheduler you will find
> a
> > > > function 'assignContainersonNode' which looks like following
> > > >
> > > > private int assignContainersOnNode(FiCaSchedulerNode node,
> > > >       FiCaSchedulerApp application, Priority priority
> > > >   ) {
> > > >     // Data-local
> > > >     int nodeLocalContainers =
> > > >       assignNodeLocalContainers(node, application, priority);
> > > >
> > > >     // Rack-local
> > > >     int rackLocalContainers =
> > > >       assignRackLocalContainers(node, application, priority);
> > > >
> > > >     // Off-switch
> > > >     int offSwitchContainers =
> > > >       assignOffSwitchContainers(node, application, priority);
> > > >
> > > >
> > > >     LOG.debug("assignContainersOnNode:" +
> > > >         " node=" + node.getRMNode().getNodeAddress() +
> > > >         " application=" + application.getApplicationId().getId() +
> > > >         " priority=" + priority.getPriority() +
> > > >         " #assigned=" +
> > > >         (nodeLocalContainers + rackLocalContainers +
> > offSwitchContainers));
> > > >
> > > >
> > > >     return (nodeLocalContainers + rackLocalContainers +
> > > > offSwitchContainers);
> > > >   }
> > > >
> > > > In this routine you will find that data-local tasks are scheduled
> > first,
> > > > then  rack-local and in then off-switch.
> > > >
> > > > After this you may find similar function in fairScheduler too.
> > > >
> > > > I hope this helps. Let me know if you more questions or if something
> is
> > > > wrong in my reasoning.
> > > >
> > > > Regards,
> > > > Shekhar
> > > >
> > > >
> > > > On Thu, Apr 3, 2014 at 10:56 AM, Brad Childs <bd...@redhat.com> wrote:
> > > >
> > > > > Sorry if this is the wrong list, i am looking for deep
> > technical/hadoop
> > > > > source help :)
> > > > >
> > > > > How does job scheduling work on yarn framework for map reduce jobs?
> >  I
> > > > see
> > > > > the yarn scheduler discussed here:
> > > > >
> > > >
> >
> https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.htmlwhich
> > > > leads me to believe tasks are scheduled based on node capacity and
> > > > > not data locality.  I've sifted through the fair scheduler and
> can't
> > find
> > > > > anything about data location or locality.
> > > > >
> > > > > Where does data locality play into the scheduling of map/reduce
> > tasks on
> > > > > yarn?  Can someone point me to the hadoop 2.x source where the data
> > block
> > > > > location is used to calculate node/container/task assignment (if
> > thats
> > > > > still happening).
> > > > >
> > > > >
> > > > >
> > > > > -bc
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Yarn / mapreduce scheduling

Posted by Sandy Ryza <sa...@cloudera.com>.
The MapReduce application master reads the split info from HDFS and then
submits requests to the scheduler based on the locations there.


On Thu, Apr 3, 2014 at 1:22 PM, Brad Childs <bd...@redhat.com> wrote:

> Sandy/Shekhar Thank you very much for the helpful responses.
>
> One last question/clarification- the getFileBlockLocations(..) in the
> FileSystem API is the only file-->node mapping that I'm aware of, and it
> seems the only place its called is in the map/reduce client
> (FileInputFormat, MultiFileSplit).
>
> Is this accurate:
>
> 1) Jobclient calls getFileBlockLocations(..) and serializes this info to a
> file on the DFS.
> 2) Scheduler reads this information for determining locality
>
> Or am I missing another call/method for determining node location for
> block locality?  I haven't located the spot in source that reads the block
> location file for scheduler is why i ask.
>
> -bc
>
>
> ----- Original Message -----
> > From: "Sandy Ryza" <sa...@cloudera.com>
> > To: common-dev@hadoop.apache.org
> > Sent: Thursday, April 3, 2014 2:38:42 PM
> > Subject: Re: Yarn / mapreduce scheduling
> >
> > The equivalent code in the Fair Scheduler is in AppSchedulable.java,
> > under assignContainer(FSSchedulerNode node, boolean reserved).
> >
> > YARN uses delay scheduling (
> >
> http://people.csail.mit.edu/matei/papers/2010/eurosys_delay_scheduling.pdf
> )
> > for achieving data-locality.
> >
> > -Sandy
> >
> >
> > On Thu, Apr 3, 2014 at 11:16 AM, Shekhar Gupta <sh...@gmail.com>
> wrote:
> >
> > > Hi Brad,
> > >
> > >     YARN scheduling does take care of data locality. In YARN, tasks
> are not
> > > assigned based on capacity. Actually certain number of containers are
> > > allocated on every node based on node's capacity. Tasks are executed on
> > > those containers. While scheduling tasks on containers YARN scheduler
> > > satisfies data locality requirements. I am not very familiar with Fair
> > > Scheduler but if you check the source of FifoScheduler you will find a
> > > function 'assignContainersonNode' which looks like following
> > >
> > > private int assignContainersOnNode(FiCaSchedulerNode node,
> > >       FiCaSchedulerApp application, Priority priority
> > >   ) {
> > >     // Data-local
> > >     int nodeLocalContainers =
> > >       assignNodeLocalContainers(node, application, priority);
> > >
> > >     // Rack-local
> > >     int rackLocalContainers =
> > >       assignRackLocalContainers(node, application, priority);
> > >
> > >     // Off-switch
> > >     int offSwitchContainers =
> > >       assignOffSwitchContainers(node, application, priority);
> > >
> > >
> > >     LOG.debug("assignContainersOnNode:" +
> > >         " node=" + node.getRMNode().getNodeAddress() +
> > >         " application=" + application.getApplicationId().getId() +
> > >         " priority=" + priority.getPriority() +
> > >         " #assigned=" +
> > >         (nodeLocalContainers + rackLocalContainers +
> offSwitchContainers));
> > >
> > >
> > >     return (nodeLocalContainers + rackLocalContainers +
> > > offSwitchContainers);
> > >   }
> > >
> > > In this routine you will find that data-local tasks are scheduled
> first,
> > > then  rack-local and in then off-switch.
> > >
> > > After this you may find similar function in fairScheduler too.
> > >
> > > I hope this helps. Let me know if you more questions or if something is
> > > wrong in my reasoning.
> > >
> > > Regards,
> > > Shekhar
> > >
> > >
> > > On Thu, Apr 3, 2014 at 10:56 AM, Brad Childs <bd...@redhat.com> wrote:
> > >
> > > > Sorry if this is the wrong list, i am looking for deep
> technical/hadoop
> > > > source help :)
> > > >
> > > > How does job scheduling work on yarn framework for map reduce jobs?
>  I
> > > see
> > > > the yarn scheduler discussed here:
> > > >
> > >
> https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.htmlwhich
> > > leads me to believe tasks are scheduled based on node capacity and
> > > > not data locality.  I've sifted through the fair scheduler and can't
> find
> > > > anything about data location or locality.
> > > >
> > > > Where does data locality play into the scheduling of map/reduce
> tasks on
> > > > yarn?  Can someone point me to the hadoop 2.x source where the data
> block
> > > > location is used to calculate node/container/task assignment (if
> thats
> > > > still happening).
> > > >
> > > >
> > > >
> > > > -bc
> > > >
> > > >
> > >
> >
>

Re: Yarn / mapreduce scheduling

Posted by Brad Childs <bd...@redhat.com>.
Sandy/Shekhar Thank you very much for the helpful responses.

One last question/clarification- the getFileBlockLocations(..) in the FileSystem API is the only file-->node mapping that I'm aware of, and it seems the only place its called is in the map/reduce client (FileInputFormat, MultiFileSplit).  

Is this accurate: 

1) Jobclient calls getFileBlockLocations(..) and serializes this info to a file on the DFS.
2) Scheduler reads this information for determining locality

Or am I missing another call/method for determining node location for block locality?  I haven't located the spot in source that reads the block location file for scheduler is why i ask.

-bc


----- Original Message -----
> From: "Sandy Ryza" <sa...@cloudera.com>
> To: common-dev@hadoop.apache.org
> Sent: Thursday, April 3, 2014 2:38:42 PM
> Subject: Re: Yarn / mapreduce scheduling
> 
> The equivalent code in the Fair Scheduler is in AppSchedulable.java,
> under assignContainer(FSSchedulerNode node, boolean reserved).
> 
> YARN uses delay scheduling (
> http://people.csail.mit.edu/matei/papers/2010/eurosys_delay_scheduling.pdf)
> for achieving data-locality.
> 
> -Sandy
> 
> 
> On Thu, Apr 3, 2014 at 11:16 AM, Shekhar Gupta <sh...@gmail.com> wrote:
> 
> > Hi Brad,
> >
> >     YARN scheduling does take care of data locality. In YARN, tasks are not
> > assigned based on capacity. Actually certain number of containers are
> > allocated on every node based on node's capacity. Tasks are executed on
> > those containers. While scheduling tasks on containers YARN scheduler
> > satisfies data locality requirements. I am not very familiar with Fair
> > Scheduler but if you check the source of FifoScheduler you will find a
> > function 'assignContainersonNode' which looks like following
> >
> > private int assignContainersOnNode(FiCaSchedulerNode node,
> >       FiCaSchedulerApp application, Priority priority
> >   ) {
> >     // Data-local
> >     int nodeLocalContainers =
> >       assignNodeLocalContainers(node, application, priority);
> >
> >     // Rack-local
> >     int rackLocalContainers =
> >       assignRackLocalContainers(node, application, priority);
> >
> >     // Off-switch
> >     int offSwitchContainers =
> >       assignOffSwitchContainers(node, application, priority);
> >
> >
> >     LOG.debug("assignContainersOnNode:" +
> >         " node=" + node.getRMNode().getNodeAddress() +
> >         " application=" + application.getApplicationId().getId() +
> >         " priority=" + priority.getPriority() +
> >         " #assigned=" +
> >         (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
> >
> >
> >     return (nodeLocalContainers + rackLocalContainers +
> > offSwitchContainers);
> >   }
> >
> > In this routine you will find that data-local tasks are scheduled first,
> > then  rack-local and in then off-switch.
> >
> > After this you may find similar function in fairScheduler too.
> >
> > I hope this helps. Let me know if you more questions or if something is
> > wrong in my reasoning.
> >
> > Regards,
> > Shekhar
> >
> >
> > On Thu, Apr 3, 2014 at 10:56 AM, Brad Childs <bd...@redhat.com> wrote:
> >
> > > Sorry if this is the wrong list, i am looking for deep technical/hadoop
> > > source help :)
> > >
> > > How does job scheduling work on yarn framework for map reduce jobs?  I
> > see
> > > the yarn scheduler discussed here:
> > >
> > https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.htmlwhich
> > leads me to believe tasks are scheduled based on node capacity and
> > > not data locality.  I've sifted through the fair scheduler and can't find
> > > anything about data location or locality.
> > >
> > > Where does data locality play into the scheduling of map/reduce tasks on
> > > yarn?  Can someone point me to the hadoop 2.x source where the data block
> > > location is used to calculate node/container/task assignment (if thats
> > > still happening).
> > >
> > >
> > >
> > > -bc
> > >
> > >
> >
> 

Re: Yarn / mapreduce scheduling

Posted by Sandy Ryza <sa...@cloudera.com>.
The equivalent code in the Fair Scheduler is in AppSchedulable.java,
under assignContainer(FSSchedulerNode node, boolean reserved).

YARN uses delay scheduling (
http://people.csail.mit.edu/matei/papers/2010/eurosys_delay_scheduling.pdf)
for achieving data-locality.

-Sandy


On Thu, Apr 3, 2014 at 11:16 AM, Shekhar Gupta <sh...@gmail.com> wrote:

> Hi Brad,
>
>     YARN scheduling does take care of data locality. In YARN, tasks are not
> assigned based on capacity. Actually certain number of containers are
> allocated on every node based on node's capacity. Tasks are executed on
> those containers. While scheduling tasks on containers YARN scheduler
> satisfies data locality requirements. I am not very familiar with Fair
> Scheduler but if you check the source of FifoScheduler you will find a
> function 'assignContainersonNode' which looks like following
>
> private int assignContainersOnNode(FiCaSchedulerNode node,
>       FiCaSchedulerApp application, Priority priority
>   ) {
>     // Data-local
>     int nodeLocalContainers =
>       assignNodeLocalContainers(node, application, priority);
>
>     // Rack-local
>     int rackLocalContainers =
>       assignRackLocalContainers(node, application, priority);
>
>     // Off-switch
>     int offSwitchContainers =
>       assignOffSwitchContainers(node, application, priority);
>
>
>     LOG.debug("assignContainersOnNode:" +
>         " node=" + node.getRMNode().getNodeAddress() +
>         " application=" + application.getApplicationId().getId() +
>         " priority=" + priority.getPriority() +
>         " #assigned=" +
>         (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
>
>
>     return (nodeLocalContainers + rackLocalContainers +
> offSwitchContainers);
>   }
>
> In this routine you will find that data-local tasks are scheduled first,
> then  rack-local and in then off-switch.
>
> After this you may find similar function in fairScheduler too.
>
> I hope this helps. Let me know if you more questions or if something is
> wrong in my reasoning.
>
> Regards,
> Shekhar
>
>
> On Thu, Apr 3, 2014 at 10:56 AM, Brad Childs <bd...@redhat.com> wrote:
>
> > Sorry if this is the wrong list, i am looking for deep technical/hadoop
> > source help :)
> >
> > How does job scheduling work on yarn framework for map reduce jobs?  I
> see
> > the yarn scheduler discussed here:
> >
> https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.htmlwhich leads me to believe tasks are scheduled based on node capacity and
> > not data locality.  I've sifted through the fair scheduler and can't find
> > anything about data location or locality.
> >
> > Where does data locality play into the scheduling of map/reduce tasks on
> > yarn?  Can someone point me to the hadoop 2.x source where the data block
> > location is used to calculate node/container/task assignment (if thats
> > still happening).
> >
> >
> >
> > -bc
> >
> >
>

Re: Yarn / mapreduce scheduling

Posted by Shekhar Gupta <sh...@gmail.com>.
Hi Brad,

    YARN scheduling does take care of data locality. In YARN, tasks are not
assigned based on capacity. Actually certain number of containers are
allocated on every node based on node's capacity. Tasks are executed on
those containers. While scheduling tasks on containers YARN scheduler
satisfies data locality requirements. I am not very familiar with Fair
Scheduler but if you check the source of FifoScheduler you will find a
function 'assignContainersonNode' which looks like following

private int assignContainersOnNode(FiCaSchedulerNode node,
      FiCaSchedulerApp application, Priority priority
  ) {
    // Data-local
    int nodeLocalContainers =
      assignNodeLocalContainers(node, application, priority);

    // Rack-local
    int rackLocalContainers =
      assignRackLocalContainers(node, application, priority);

    // Off-switch
    int offSwitchContainers =
      assignOffSwitchContainers(node, application, priority);


    LOG.debug("assignContainersOnNode:" +
        " node=" + node.getRMNode().getNodeAddress() +
        " application=" + application.getApplicationId().getId() +
        " priority=" + priority.getPriority() +
        " #assigned=" +
        (nodeLocalContainers + rackLocalContainers + offSwitchContainers));


    return (nodeLocalContainers + rackLocalContainers +
offSwitchContainers);
  }

In this routine you will find that data-local tasks are scheduled first,
then  rack-local and in then off-switch.

After this you may find similar function in fairScheduler too.

I hope this helps. Let me know if you more questions or if something is
wrong in my reasoning.

Regards,
Shekhar


On Thu, Apr 3, 2014 at 10:56 AM, Brad Childs <bd...@redhat.com> wrote:

> Sorry if this is the wrong list, i am looking for deep technical/hadoop
> source help :)
>
> How does job scheduling work on yarn framework for map reduce jobs?  I see
> the yarn scheduler discussed here:
> https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html which leads me to believe tasks are scheduled based on node capacity and
> not data locality.  I've sifted through the fair scheduler and can't find
> anything about data location or locality.
>
> Where does data locality play into the scheduling of map/reduce tasks on
> yarn?  Can someone point me to the hadoop 2.x source where the data block
> location is used to calculate node/container/task assignment (if thats
> still happening).
>
>
>
> -bc
>
>

Re: Yarn / mapreduce scheduling

Posted by Chen He <ai...@gmail.com>.
Hi Brad

IMHO, here is how hadoop scheduler works

For Hadoop 1.x

In one heartbeat interval, Fifo Scheduler keep strict FIFO order and assign
a node as many as possible local task and only one remote task. For Fair
scheduler, if you turn on delay algorithm, it also assign local task as
many as possible but none remote task if a job does not wait longer than a
threshold. If a job wait longer than a threshold, fair scheduler will
assign remote task to this job.

For Hadoop 2.x

In one heartbeat interval, Fifoscheduler does not keep the strict FIFO
order, it assign local containers to a AM on a given node, if this AM
blacklist a node, Fifoscheduler will skip this AM and check the next AM in
the job queue. For data locality, FifoScheduler is sharing the same policy
with Capacityscheduler since they are both using
FiCaSchedulerApp.allocate() to assign container.

Hope it is useful

Regards!

Chen

On Tue, Jan 26, 2016 at 2:04 PM, Sultan <su...@gmail.com> wrote:

> Brad Childs <bd...@...> writes:
>
> >
> > Sorry if this is the wrong list, i am looking for deep technical/hadoop
> source help :)
> >
> > How does job scheduling work on yarn framework for map reduce jobs?  I
> see
> the yarn scheduler discussed here:
> >
>
> https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html
>  which leads me
> > to believe tasks are scheduled based on node capacity and not data
> locality.  I've sifted through the fair
> > scheduler and can't find anything about data location or locality.
> >
> > Where does data locality play into the scheduling of map/reduce tasks on
> yarn?  Can someone point me to the
> > hadoop 2.x source where the data block location is used to calculate
> node/container/task assignment (if
> > thats still happening).
> >
> > -bc
> >
> >
>
>
>
> Hi Brad,
>
> Were you able to find an answer for you question?
>
> Sultan
>
>
>

Re: Yarn / mapreduce scheduling

Posted by Sultan <su...@gmail.com>.
Brad Childs <bd...@...> writes:

> 
> Sorry if this is the wrong list, i am looking for deep technical/hadoop
source help :) 
> 
> How does job scheduling work on yarn framework for map reduce jobs?  I see
the yarn scheduler discussed here:
>
https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html
 which leads me
> to believe tasks are scheduled based on node capacity and not data
locality.  I've sifted through the fair
> scheduler and can't find anything about data location or locality.
> 
> Where does data locality play into the scheduling of map/reduce tasks on
yarn?  Can someone point me to the
> hadoop 2.x source where the data block location is used to calculate
node/container/task assignment (if
> thats still happening).
> 
> -bc
> 
> 



Hi Brad, 

Were you able to find an answer for you question?

Sultan