You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Rajesh Balamohan (JIRA)" <ji...@apache.org> on 2015/06/03 13:56:49 UTC

[jira] [Comment Edited] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

    [ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570714#comment-14570714 ] 

Rajesh Balamohan edited comment on TEZ-2496 at 6/3/15 11:55 AM:
----------------------------------------------------------------

Thanks [~sseth]
>> For large jobs (large number of partitions), does this information also become too expensive ? 
- Since it is in compressed format & not materialized unless needed, it wasn't adding much to memory pressure.  IIRC, it was adding couple of more bytes (much < 10-50 bytes in compressed form), but definitely can add the option of enabling it as needed.


>> The way the data is aggregated is very lossy. 100 tasks generating <1MB per partition will show up as 1MB per partition. While that's ok to determine the sort order - it can get in the way of other optimizations which may be possible with such statistics. Rather than providing aggregated stats - it may be better to provide task level statistics, and let the consumer aggregate the information as they see fit.
- Yes, this was done purposefully to reduce the memory/cpu pressure. Completely agree that it can be an issue for other optimizations. Might have to change from "getDataSize(int destIndex)" --> "getTaskDataSizes()" and let the consumer decide on aggregation. Will change it.

>> The APIs get interesting as well, especially when adding a flag to enable/disable these stats. Returning 0 is loss of information. May need an API to check whether such stats are being published by the specific Output.  Alternately a return code which indicates that stats are not available. Beyond this - are there any semantics on whether all tasks must publish these stats if one task in the vertex is publishing them ?
- No, currently all tasks publish the info. If it is not available, it would not be accounted for the sort order in ShuffleVertexManager. 

>> Depending on how the API is structures - invocations to get these statistics would need to indicate that these stats are disabled. On the API - what's reported is the exact size, but what's available is a range. The reporting / get API could be an ENUM indicating the range / absence of stats to be consistent.
- Agreed, will change it.


was (Author: rajesh.balamohan):
Thanks [~sseth]
>> For large jobs (large number of partitions), does this information also become too expensive ? 
- Since it is in compressed format & not materialized unless needed, it wasn't adding much to memory pressure.  IIRC, it was adding couple of more bytes (much < 10-50 bytes in compressed form), but definitely can add the option of enabling it as needed.


>> The way the data is aggregated is very lossy. 100 tasks generating <1MB per partition will show up as 1MB per partition. While that's ok to determine the sort order - it can get in the way of other optimizations which may be possible with such statistics. Rather than providing aggregated stats - it may be better to provide task level statistics, and let the consumer aggregate the information as they see fit.
- Yes, this was done purposefully to reduce the memory/cpu pressure. Completely agree that it can be an issue for other optimizations. Might have to change from "getDataSize(int destIndex)" --> "getTaskDataSizes()" and let the consumer decide on aggregation. Will change it.

>> The APIs get interesting as well, especially when adding a flag to enable/disable these stats. Returning 0 is loss of information. May need an API to check whether such stats are being published by the specific Output.  Alternately a return code which indicates that stats are not available. Beyond this - are there any semantics on whether all tasks must publish these stats if one task in the vertex is publishing them ?
No, currently all tasks publish the info. If it is not available, it would not be accounted for the sort order in ShuffleVertexManager. 

>> Depending on how the API is structures - invocations to get these statistics would need to indicate that these stats are disabled. On the API - what's reported is the exact size, but what's available is a range. The reporting / get API could be an ENUM indicating the range / absence of stats to be consistent.
- Agreed, will change it.

> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.  This would be helpful in scenarios, where there is limited resources (or concurrent jobs running or multiple waves) with dataskew and the task which gets large amount of data gets sceheduled much later.
> e.g Consider the following hive query running in a queue with limited capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of nulls are there for ss_sold_time_sk, it would tend to have data skew towards 70429.  If the reducer which gets this data gets scheduled much earlier (i.e in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)