You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Siddharth Seth (JIRA)" <ji...@apache.org> on 2014/08/26 04:05:58 UTC

[jira] [Commented] (TEZ-1157) Optimize broadcast :- Tasks pertaining to same job in same machine should not download multiple copies of broadcast data

    [ https://issues.apache.org/jira/browse/TEZ-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14110166#comment-14110166 ] 

Siddharth Seth commented on TEZ-1157:
-------------------------------------

Comments on the patch. Since this WIP, I'm guessing some of this will anyway end up getting fixed in the final patch.

- isDebugEnabled = LOG.isDebugEnabled() || true; | Needs to go
- Creating the list of localDisks in each Fetcher - this can be done once in ShuffleManager and passed in. Similarly for the localFS, reading sharedFetchEnabled from the Configuration etc. Also, should this list be sorted, in case YARN provides the list in a random order ?
- SharedInputAttemptIdentifier in ShuffleInputEventHandlerImpl - not needed, and never used
- this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0); | This will end up triggering the code path for Partitioned Output as well. Looks like we need Edge information in *Context.
- final boolean shareInput = this.useSharedInputs; | Not needed
- ShuffleManager.pendingLocks - not needed. outputPaths not currently used, but can be passed to individual Fetchers
- if (created == false && !lockFile.exists()) { | Would be useful to log a message here. This is effectively an error right ?

The important stuff
- The partial fetch use case seems fairly important, given that the fetching task may start before all the sources are complete. The list of inputs seen by different tasks (for a single host) can be very different.
- In getLock - I'm not sure what the purpose of obtaining a shared lock is. Shouldn't this just return, so that the fetcher thread is freed up - and could potentially be allocated to another host. If there aren't any other hosts, control would eventually come back to the same loop. Also, once a shared lock is obtained - it's highly unlikely that anyone else will be able to obtain an exclusive lock - so subsequent fetches will be http based.
- In doSharedFetch - if (lock == null) { <- This should never happen, at least if a shared lock is being obtained.
- In doSharedFetch - if an exclusive lock is obtained, and some inputs exist (some other task got the exclusive lock at some point) - this falls back to an http fetch. If the attempt list to a task is different (when starting at different times, or limited by the amount of work given to each fetcher), this will end up falling back to http fetch.
- if(inputs != srcAttempts.size()) { (and other similar checks) <- Is this check sufficient to use localDiskFetch ? The number of files found may match, but the actual files on disk may belong to different source identifiers. In this case, the local fetch will likely end up doing a partial fetch, and reporting the rest as pending (not sure if it reports failures at this point)
- Also, should setupLocalDiskFetch be called from within an exclusive lock ? That would cause each of the tasks to read the data serially.

- One more bit, which I think may be a problem is that the same attempt can end up being fetched multiple times in case of failure. (This may affect direct local fetch as well). The file being created may end up overwriting another file. Since the source tasks may have run on different nodes, there's a possibility of hitting this (event after we end up moving to assigning a single host to a single fetcher, instead of the current behaviour of a host going to whichever fetcher is available).

> Optimize broadcast :- Tasks pertaining to same job in same machine should not download multiple copies of broadcast data
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: TEZ-1157
>                 URL: https://issues.apache.org/jira/browse/TEZ-1157
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Rajesh Balamohan
>            Assignee: Gopal V
>              Labels: performance
>         Attachments: TEZ-1152.WIP.patch, TEZ-1157.3.WIP.patch, TEZ-broadcast-shuffle+vertex-parallelism.patch
>
>
> Currently tasks (belonging to same job) running in the same machine download its own copy of broadcast data.  Optimization could be to  download one copy in the machine, and the rest of the tasks can refer to this downloaded copy.



--
This message was sent by Atlassian JIRA
(v6.2#6252)