You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@tez.apache.org by "Fabio C." <an...@gmail.com> on 2015/02/01 07:39:55 UTC

Re: Questions about Tez under the hood

Got it. Thanks for the support.

Fabio

On Fri, Jan 30, 2015 at 9:20 PM, Bikas Saha <bi...@hortonworks.com> wrote:

>  There are many nuances like container affinity etc. but broadly this is
> what happens.
>
>
>
> The scheduler loops through all free containers. For each container, at
> node/rack/any level of locality, it picks the current top priority of tasks
> that needs to be assigned. For that priority it asks YARN AMRMClient to
> return pending requests at given locality. These pending requests are
> returned in the order they were received. Thus tasks of same priority from
> different vertices would be received intermingled in the order in which
> they arrived.
>
>
>
> Bikas
>
>
>
> *From:* Fabio C. [mailto:anytek88@gmail.com]
> *Sent:* Friday, January 30, 2015 5:52 AM
> *To:* user@tez.apache.org
> *Subject:* Re: Questions about Tez under the hood
>
>
>
> Thanks a lot.
> Now I was trying to figure out what happens if there are different tasks
> from two different vertexes at the same priority awaiting for resources. If
> a suitable container becomes available, who is going to get it? I'm having
> a hard time finding the code about this. I was thinking that maybe we serve
> at first the first submitted vertex and among its tasks we follow the task
> id order, but it's just a guess.
>
>
>
> On Wed, Jan 28, 2015 at 10:50 PM, Hitesh Shah <hi...@apache.org> wrote:
>
> Answers inline.
>
> — Hitesh
>
> On Jan 28, 2015, at 3:15 AM, Fabio <an...@gmail.com> wrote:
>
> > Hi everyone,
> > I take back this mail since I have a few more questions about Tez. I am
> digging into the internal scheduling policy and I'm trying to fully
> understand how containers are assigned once Tez receive them from the RM.
> > I am mainly referring to
> org.apache.tez.dag.app.rm.YarnTaskSchedulerService (I am currently on
> 0.5.0, I hope there have been no change) and I am not considering locality
> (let's say I just have one node in the cluster).
> >
> > Could someone please confirm this?
> > - When a response comes from the RM with container allocations for the
> application, those containers are added to the list of delayed containers
> (together with old containers already available for reuse), and their
> scheduling time is set to be 1 ms after the last scheduling time seen so
> far. They are assigned right away only if container reuse is disabled.
>
> Yes. With container reuse disabled, there should be a 1:1 mapping for a
> pending task to an allocated container. With re-use enabled, existing
> containers are used first to reduce launch cost overheads.
>
> > - The tez scheduler will keep on trying to assign a container at its
> nextScheduleTime.
>
> Yes - as well as whenever a new pending task comes in, it will try and
> assign a container to it.
>
> > - If we've just got any container from the RM, then the tez scheduler
> will try to assign all delayed containers (old ones first and new ones
> last, since they are ordered according to their next scheduling time)
>
> Yes - probably something which needs to be fixed. This was being done
> earlier before when a new container would be allocated first instead of a
> re-used container. This may no longer be needed as the scheduling loop
> would get trigged on the next schedule time elapse. This probably
> unintentionally acts as a trigger to run a new matching loop.
>
> > Just out of curiosity, why not ordering the delayed containers according
> to their expiry time?
>
> The general flow is that on each loop, a container is tried to be assigned
> to a potential matching task. On each loop, its locality constraints are
> relaxed more or more to allow for more matches ( local only on round 1,
> rack or local match on round 2, …). By the time, the container hits its
> final loop, it will match against any pending task. Sorting by next
> schedule time means the thread wakes up when it is time to run the next
> loop for a given container.  Sorting by expiry would imply scanning the
> whole list to find all containers whose schedule time has elapsed.
>
>
> >
> > Thanks
> >
> > Fabio
> >
> > On 10/29/2014 04:48 PM, Hitesh Shah wrote:
> >> Answers inline.
> >>
> >> — Hitesh
> >>
> >> On Oct 29, 2014, at 2:33 AM, Fabio <an...@gmail.com> wrote:
> >>
> >>> Thanks Bikas for your answer and suggestion, actually my work deals
> more with high level modeling/behavior/performance of Tez, but there is
> another guy who is goign to handle Tez sources, I will suggest him to
> contribute.
> >>> I've just found many commented configuration parameters in
> org.apache.tez.dag.api.TezConfiguration that I didn't even know, they will
> help.
> >>>
> >>> Right now I have another question that came to my mind while modeling
> Tez:
> >>> Situation: I have a DAG with 2 tasks waiting to run, the cluster is
> quite overloaded. The Tez AM will ask for 2 containers at the Resource
> Manager and wait for them. At some point a single container becomes
> available and a task can run and finish, so Tez (I guess) will exploit that
> same container for reuse, but what about the other request sent to the RM?
> Is it somehow actively voided by Tez or at some point it will just get
> another container that wont be used (and possibly discarded afterward)? I
> don't even know if YARN have such a feature for removing a previously
> submitted request to the RM.
> >>>
> >> [Hitesh] Tez will always ask the RM for as many containers as the tasks
> it needs to run. In cases when a task is scheduled to run on an existing
> available container, it will do so based on certain conditions such as
> checking if the data needed by the task is available on the same node
> and/or rack as that of the existing container.
> >>
> >> In terms of the RM request management, the protocol between the RM and
> an ApplicationMaster is more or less an update protocol ( and not an
> incremental one ). Based on your example, Tez would first ask the RM for 2
> containers. Once it gets one, it will keep on telling the RM that it now
> needs one. If the previously assigned container is also used for the 2nd
> task, it will update the ask to the RM to 0 containers. There is obviously
> a minor race condition where the RM may have already allocated the
> container before Tez is able to tell it that it does not need the
> additional container. In such cases, Tez will get an additional allocation
> which it does not need but release it in due time ( the YARN protocol
> supports releasing containers without using them ).
> >>
> >>
> >>> I would keep this thread for future generic questions about Tez
> behavior if it's ok.
> >>>
> >>> Thanks so far :)
> >>>
> >>> Fabio
> >>>
> >>> On 10/27/2014 05:48 PM, Bikas Saha wrote:
> >>>> Also, any contributions to the project via your thesis work would be
> welcome. Please do first open a jira and provide a design overview before
> submitting code.
> >>>>  From: Bikas Saha [mailto:bikas@hortonworks.com]
> >>>> Sent: Monday, October 27, 2014 9:47 AM
> >>>> To: user@tez.apache.org
> >>>> Subject: RE: Questions about Tez under the hood
> >>>>  Answers inline.
> >>>>  From: Fabio C. [mailto:anytek88@gmail.com]
> >>>> Sent: Monday, October 27, 2014 7:08 AM
> >>>> To: user@tez.apache.org
> >>>> Subject: Questions about Tez under the hood
> >>>>  Hi guys, I'm currently working at my master degree thesis on Tez,
> and I am trying to understand how Tez works under the hood. I have some
> questions, I hope someone can help with this:
> >>>>
> >>>> 1) How does Tez handle containers for reuse? Are they kept for some
> seconds (how long?) in a sort of buffer waiting for tasks which will need
> them? Or a container is sent back to the RM if no task is immediately ready
> to take it?
> >>>>
> >>>> [Bikas] Yes they wait around for a buffer period of time. Idle
> containers are released back the RM randomly between a mix and a max
> release time until a minimum held container threshold is met. So the
> behavior can be customized using the min/max timeouts and the min held
> threshold.
> >>>>
> >>>> 2) Let's say I have a DAG with two branches proceeding in parallel
> before joining in a root node (such as the example on the tez home
> pagehttp://tez.apache.org/images/PigHiveQueryOnTez.png
> <http://cp.mcafee.com/d/5fHCN0gdEI9FK9LCzBBYTsSztNWXX3bb1J6XxEVosphhjdETsuK-MOO-rhKUqekmkkhNEVdA95tE2ytoPH0Nm9mDbCOtoPH0Nm9mDbC_uIYgIM_R-uuosvLRXBQhPMXAnbL3CjhPORQX8FGTKVOEuvkzaT0QSyrhdTVdByX2rXXapKVI07fNjJmSNf-00V9nifQMbi8xuhDM7r-9ocg1fyDbg8CSWv4L7VJNwn76zAsn8iaXgUDmcWMclylFOUaDUFSHroD_00jqrzPapI5-Aq807r-AVlxgQgqgGTcQg0DNgQgdMg-9EwCjYQg4WRgdIL6Pq4mF8lDLDck>
> ). In this case, we will have both branches running at the same time. At
> some point we may have the first branch that is almost complete, while the
> second is still at an early stage. In this case, does Tez knows that "soon
> or later" the two branches will merge, thus there will be a common consumer
> waiting for the slower branch to complete? Actually the real question is:
> does Tez prioritize the scheduling/resource allocation of tasks belonging
> to slower branches? If yes, what kind of policy is adopted? Is it
> configurable?
> >>>>
> >>>> [Bikas] Currently the priority of a vertex is the distance from the
> source of the DAG. So vertices can run in parallel. On the roadmap are
> items like critical path scheduling where the vertex that is holding up the
> job the most or that’s going to unblock the most amount of downstream work
> to be given higher priority.
> >>>>
> >>>> 3) tez.am.shuffle-vertex-manager.min-src-fraction: if I have a dag
> made of two producer vertexes, each one running 10 tasks, and below them a
> consumer vertex, let's say running 5 tasks, so if this property is set to
> 0.2, does it mean that before running any consumer task we need 2 producer
> tasks to complete for each of the producer vertexes? Or are they considered
> as a whole and we need just 4 tasks completed (even just from one vertex)?
> >>>>
> >>>> [Bikas] It currently looks at the fraction of the whole (both
> combined) but we are going to change it to look at the fraction per source
> vertex. Again, this is just a hint. (With auto-parallelism on) the vertex
> also looks at whether enough data has been produced before triggering the
> tasks because the real intention is to have enough data available for the
> reduce to pull so that it can overlap the pull with the completion of the
> map tasks.
> >>>>
> >>>> 4) As far as I understand, a single Tez Application Master can handle
> multiple DAGs at the same time, but only if the user-application has been
> coded to do so (for example, if I run two wordcount with the same user, it
> simply creates two different Tez App Master). Is this correct?
> >>>>
> >>>> [Bikas] If the TezClient is started in session mode then it re-uses
> the App Master for multiple DAGs. The code is the same in session and
> non-session mode. The behavior can be changed via configuration (or hard
> coded in the code if desired). So you can use both modes with the same
> code. To be clear, the AppMaster does not run dags concurrently. It runs
> one DAG at a time.
> >>>>
> >>>> Thanks in advance
> >>>>
> >>>> Fabio
> >>>>
> >>>> CONFIDENTIALITY NOTICE
> >>>> NOTICE: This message is intended for the use of the individual or
> entity to which it is addressed and may contain information that is
> confidential, privileged and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any printing, copying, dissemination, distribution,
> disclosure or forwarding of this communication is strictly prohibited. If
> you have received this communication in error, please contact the sender
> immediately and delete it from your system. Thank You.
> >>>
> >>
> >
>
>
>