You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@tez.apache.org by Raajay Viswanathan <ra...@gmail.com> on 2015/08/19 12:24:26 UTC

Logical to Physical DAG conversion

Hello,

I am just getting started with Tez and need some clarification about the logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to physical DAG at one go or are the number of mappers and reducers for a stage determined only when the stage is ready to run ?

Also, what are some rules of thumb regarding the number of mappers/reducers for a stage ? I would ideally like to fix the #mappers / #reducers in the application itself rather than let Tez determine it. What are the common pitfalls in doing so ?

Thanks,
Raajay

Re: Logical to Physical DAG conversion

Posted by Raajay Viswanathan <ra...@gmail.com>.
Thanks Jeff and Hitesh. Wonderful pointers / summary to get me started.

Raajay


> On Aug 19, 2015, at 4:20 PM, Hitesh Shah <hi...@apache.org> wrote:
> 
> If you are mainly looking at this from a “mapper” and “reducer” perspective, there are 2 main ways in which Tez affects parallelism: 
> 
> i) For the mapper, the no. of maps is effectively governed by how many splits are created and how are splits assigned to tasks. The initial split creation is controlled by the InputSplitFormat. Tez then looks at the cluster capacity to decide whether the no. of splits are optimal. For example, the InputFormat could create 1000 splits which would imply a 1000 tasks. However, if the max no. of containers that can be launched is only 50, this would imply 20 waves of tasks for the full stage/vertex to complete. So in this case, if grouping is enabled, Tez tries and re-shuffles the splits to around 1.5 waves ( 1.5 is configurable ) but also ensures that splits are kept within a bounded range with respect to the amount of data being processed. For example, if each split was processing 1 TB of data, there is no need to group splits as 1 TB is already too large. Whereas if each split was 1 MB, then Tez will likely end bundling up more and more splits to be processed by a single task until a min limit is reached on a per task basis. Disabling grouping would allow you to control no. of tasks by configuring the InputSplit as needed to a more static nature.
> 
> 2) For reducers, the main problem everyone has is that today you could configure a 1000 reducers but the mappers generated only 10 MB of data in total. This could be processed by a single reducer. Tez monitors the data being generated from the mappers and dynamically tries to decide how many reducers it can scale down to. This is the auto-reduce parallelism that Jeff referred to in his email earlier. Again, a static configuration for the reducer stage i.e setParallelism() and disabling auto-reduce would stop Tez from making any runtime changes. 
> 
> To answer your first question, each stage/vertex is governed by its own VertexManager which in the end is user-code ( most of the common ones are implemented within the framework but can be overridden if needed). These VMs can be as dumb or as complex as possible and can make runtime decisions. There are certain points in the processing up to which the VM can tweak parallelism. After a certain point, parallelism becomes final ( due to the requirement that failures need to be handled in a deterministic manner such that re-running a task results in the same data output each time around ).
> 
> thanks
> — Hitesh
> 
> On Aug 19, 2015, at 3:24 AM, Raajay Viswanathan <ra...@gmail.com> wrote:
> 
>> Hello,
>> 
>> I am just getting started with Tez and need some clarification about the logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to physical DAG at one go or are the number of mappers and reducers for a stage determined only when the stage is ready to run ?
>> 
>> Also, what are some rules of thumb regarding the number of mappers/reducers for a stage ? I would ideally like to fix the #mappers / #reducers in the application itself rather than let Tez determine it. What are the common pitfalls in doing so ?
>> 
>> Thanks,
>> Raajay
> 


Re: Logical to Physical DAG conversion

Posted by Hitesh Shah <hi...@apache.org>.
If you are mainly looking at this from a “mapper” and “reducer” perspective, there are 2 main ways in which Tez affects parallelism: 

i) For the mapper, the no. of maps is effectively governed by how many splits are created and how are splits assigned to tasks. The initial split creation is controlled by the InputSplitFormat. Tez then looks at the cluster capacity to decide whether the no. of splits are optimal. For example, the InputFormat could create 1000 splits which would imply a 1000 tasks. However, if the max no. of containers that can be launched is only 50, this would imply 20 waves of tasks for the full stage/vertex to complete. So in this case, if grouping is enabled, Tez tries and re-shuffles the splits to around 1.5 waves ( 1.5 is configurable ) but also ensures that splits are kept within a bounded range with respect to the amount of data being processed. For example, if each split was processing 1 TB of data, there is no need to group splits as 1 TB is already too large. Whereas if each split was 1 MB, then Tez will likely end bundling up more and more splits to be processed by a single task until a min limit is reached on a per task basis. Disabling grouping would allow you to control no. of tasks by configuring the InputSplit as needed to a more static nature.

2) For reducers, the main problem everyone has is that today you could configure a 1000 reducers but the mappers generated only 10 MB of data in total. This could be processed by a single reducer. Tez monitors the data being generated from the mappers and dynamically tries to decide how many reducers it can scale down to. This is the auto-reduce parallelism that Jeff referred to in his email earlier. Again, a static configuration for the reducer stage i.e setParallelism() and disabling auto-reduce would stop Tez from making any runtime changes. 

To answer your first question, each stage/vertex is governed by its own VertexManager which in the end is user-code ( most of the common ones are implemented within the framework but can be overridden if needed). These VMs can be as dumb or as complex as possible and can make runtime decisions. There are certain points in the processing up to which the VM can tweak parallelism. After a certain point, parallelism becomes final ( due to the requirement that failures need to be handled in a deterministic manner such that re-running a task results in the same data output each time around ).

thanks
— Hitesh

On Aug 19, 2015, at 3:24 AM, Raajay Viswanathan <ra...@gmail.com> wrote:

> Hello,
> 
> I am just getting started with Tez and need some clarification about the logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to physical DAG at one go or are the number of mappers and reducers for a stage determined only when the stage is ready to run ?
> 
> Also, what are some rules of thumb regarding the number of mappers/reducers for a stage ? I would ideally like to fix the #mappers / #reducers in the application itself rather than let Tez determine it. What are the common pitfalls in doing so ?
> 
> Thanks,
> Raajay


Re: Logical to Physical DAG conversion

Posted by "Jianfeng (Jeff) Zhang" <jz...@hortonworks.com>.
You can specify the number of tasks (number of mappers/reducers> of a
stage when you build the DAG before you run it. Tez also allow user to
change the number of tasks dynamically at runtime. ShuffleVertexManager is
the user plugin to change the parallelism based on the outputs of its
upstream stage.
Of course, you can make your own VertexManager plugin for a different
strategy for determining the parallelism. But I would suggest you to use
ShuffleVertexManager because it is not easy to write your own
VertexManager when you are not familiar with the Tez internals. And
ShuffleVertexManager can only be used for
ScatterGather edge (similar as shuffle of map reduce)

Here¹s the parameter you can set for ShuffleVertexManaget to control how
to determine the parallelism (copy from the source code), let me know if
you have any future question.


 /**
   * In case of a ScatterGather connection, the fraction of source tasks
which
   * should complete before tasks for the current vertex are scheduled
   */
  public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION =
                   
"tez.shuffle-vertex-manager.min-src-fraction";

  /**
   * In case of a ScatterGather connection, once this fraction of source
tasks
   * have completed, all tasks on the current vertex can be scheduled.
Number of
   * tasks ready for scheduling on the current vertex scales linearly
between
   * min-fraction and max-fraction
   */
  public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION =
                   
"tez.shuffle-vertex-manager.max-src-fraction";

  
  /**
   * Enables automatic parallelism determination for the vertex. Based on
input data
   * statisitics the parallelism is decreased to a desired level.
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL =
                   
"tez.shuffle-vertex-manager.enable.auto-parallel";

  
  /**
   * The desired size of input per task. Parallelism will be changed to
meet this criteria
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE =
                   
"tez.shuffle-vertex-manager.desired-task-input-size";


  /**
   * Automatic parallelism determination will not decrease parallelism
below this value
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM =
                   
"tez.shuffle-vertex-manager.min-task-parallelism";








Best Regard,
Jeff Zhang





On 8/19/15, 6:24 PM, "Raajay Viswanathan" <ra...@gmail.com> wrote:

>Hello,
>
>I am just getting started with Tez and need some clarification about the
>logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG
>to physical DAG at one go or are the number of mappers and reducers for a
>stage determined only when the stage is ready to run ?
>
>Also, what are some rules of thumb regarding the number of
>mappers/reducers for a stage ? I would ideally like to fix the #mappers /
>#reducers in the application itself rather than let Tez determine it.
>What are the common pitfalls in doing so ?
>
>Thanks,
>Raajay