You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Bikas Saha (JIRA)" <ji...@apache.org> on 2013/08/05 10:38:48 UTC

[jira] [Comment Edited] (TEZ-338) Determine reduce task parallelism

    [ https://issues.apache.org/jira/browse/TEZ-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13729301#comment-13729301 ] 

Bikas Saha edited comment on TEZ-338 at 8/5/13 8:38 AM:
--------------------------------------------------------

Typical approach that has worked elsewhere is for the user to over-partition and allow the framework to reduce the number of partitions at runtime. After the partition stage has executed, reducing the number of partitions is easier than increasing them because increasing is difficult without re-running the partition stage again.

Approach could be the following
1) Record data output from each task and track the aggregate for the partition stage
2) Enhance the BipartiteSlowStartVertexScheduler to track the above data for the source (partition stage). BipartiteSlowStartVertexScheduler is the object that understands shuffle-based partitioning semantics inside Tez that is otherwise agnostic to logical data.
3) After say 25% of the source tasks have finished, extrapolate the data output to predict the expected total data output of the source stage
4) Divide the expected total data output with desired data input per reduce vertex to determine the number of reduce vertices. Create those many and start them when appropriate.
5) Run "user code" inside the BipartiteSlowStartScheduler to generate additional user payload that sends information to the reduce task about the range of indices it needs to read from the shuffle service. The index range is determined by load balancing across the reduce vertex count calculated above. Assigning consecutive indices to a reduce task may be better for sequential reads during shuffle. The additional user payload index information may be used to override the initial index information set by the user.
6) In the reduce task, use the additional configuration, to read data from the shuffle service. This may need tinkering with the existing shuffle input reader so that it reads a range of indices instead of 1.

                
      was (Author: bikassaha):
    Typical approach that has worked elsewhere is for the user to over-partition and allow the framework to reduce the number of partitions at runtime. After the partition stage has executed, reducing the number of partitions is easier than increasing them because increasing is difficult without re-running the partition stage again.

Approach could be the following
1) Record data output from each task and track the aggregate for the partition stage
2) Enhance the BipartiteSlowStartVertexScheduler to track the above data for the source (partition stage). BipartiteSlowStartVertexScheduler is the object that understands shuffle-based partitioning semantics inside Tez that is otherwise agnostic to logical data.
3) After say 25% of the source tasks have finished, extrapolate the data output to predict the expected total data output of the source stage
4) Divide the expected total data output with desired data input per reduce vertex to determine the number of reduce vertices. Create those many and start them when appropriate.
5) Run "user code" inside the BipartiteSlowStartScheduler to generate additional user payload that sends information to the reduce task about the range of indices it needs to read from the shuffle service. The index range is determined by load balancing across the reduce vertex count calculated above. Assigning consecutive indices to a reduce task may be better for sequential reads during shuffle. The additional user payload index information may be used to override the initial index information set by the user.
6) In the reduce task, use the additional configuration, to read data from the shuffle service. This may need tinkering with the existing shuffle input reader so that it reads a range of indices instead of 1.

                  
> Determine reduce task parallelism
> ---------------------------------
>
>                 Key: TEZ-338
>                 URL: https://issues.apache.org/jira/browse/TEZ-338
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Bikas Saha
>              Labels: TEZ-0.2.0
>
> Determine the parallelism of reduce tasks at runtime. This is important because its difficult to determine this accurately before the job actually runs due to unknown data reduction ratios in the intermediate stages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira