You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "László Bodor (Jira)" <ji...@apache.org> on 2020/01/08 18:53:00 UTC

[jira] [Commented] (HIVE-22707) MergeJoinWork should be considered while collecting DAG credentials

    [ https://issues.apache.org/jira/browse/HIVE-22707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010922#comment-17010922 ] 

László Bodor commented on HIVE-22707:
-------------------------------------

cc: [~ashutoshc], if you can take a look, thanks

> MergeJoinWork should be considered while collecting DAG credentials
> -------------------------------------------------------------------
>
>                 Key: HIVE-22707
>                 URL: https://issues.apache.org/jira/browse/HIVE-22707
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: HIVE-22707.01.patch
>
>
> Given a scenario, when there are 2 different buckets, and the output is written to another bucket than the source. Under specific circumstances, FileSinkOperator is only used in Reducer stages, and if a root work in that stage is a merge join work, it's not scanned for output uris/paths, therefore needed delegation tokens are not fetched for e.g. the output s3 bucket.
> https://github.com/apache/hive/blob/0df4f6c61010b64246d4790f9ce14e966ef34dcb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java#L1507-L1514
> {code}
>   public void addCredentials(BaseWork work, DAG dag) throws IOException {
>     dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
>     if (work instanceof MapWork) {
>       addCredentials((MapWork) work, dag);
>     } else if (work instanceof ReduceWork) {
>       addCredentials((ReduceWork) work, dag);
>     }
>   }
> {code}
> sample plan, not Merge Join Operator [MERGEJOIN_35]
> {code}
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
> | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
> |                                                    |
> | Stage-3                                            |
> |   Stats Work{}                                     |
> |     Stage-4                                        |
> |       Create Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
> |         Stage-0                                    |
> |           Move Operator                            |
> |             Stage-1                                |
> |               Reducer 3                            |
> |               File Output Operator [FS_20]         |
> |                 Group By Operator [GBY_18] (rows=1 width=440) |
> |                   Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] |
> |                 <-Reducer 2 [CUSTOM_SIMPLE_EDGE]   |
> |                   File Output Operator [FS_10]     |
> |                     table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
> |                     Select Operator [SEL_9] (rows=8400 width=7) |
> |                       Output:["_col0"]             |
> |                       Merge Join Operator [MERGEJOIN_35] (rows=8400 width=7) |
> |                         Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] |
> |                       <-Map 1 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_38]            |
> |                           PartitionCols:_col1      |
> |                           Select Operator [SEL_37] (rows=16799 width=15) |
> |                             Output:["_col1"]       |
> |                             Filter Operator [FIL_36] (rows=16799 width=15) |
> |                               predicate:((cs_sold_time_sk = 74858L) and cs_call_center_sk is not null) |
> |                               TableScan [TS_0] (rows=1439980416 width=15) |
> |                                 tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] |
> |                       <-Map 4 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_41]            |
> |                           PartitionCols:_col0      |
> |                           Select Operator [SEL_40] (rows=21 width=107) |
> |                             Output:["_col0"]       |
> |                             Filter Operator [FIL_39] (rows=21 width=107) |
> |                               predicate:((CAST( cc_county AS STRING) = 'Williamson County') and cc_call_center_sk is not null) |
> |                               TableScan [TS_3] (rows=42 width=107) |
> |                                 tpcds_bin_partitioned_orc_1000@call_center,cc, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] |
> |                   PARTITION_ONLY_SHUFFLE [RS_17]   |
> |                     Group By Operator [GBY_16] (rows=1 width=424) |
> |                       Output:["_col0"],aggregations:["compute_stats(col1, 'hll')"] |
> |                       Select Operator [SEL_15] (rows=8400 width=7) |
> |                         Output:["col1"]            |
> |                          Please refer to the previous Select Operator [SEL_9] |
> |         Stage-2                                    |
> |           Dependency Collection{}                  |
> |              Please refer to the previous Stage-1  |
> |                                                    |
> +----------------------------------------------------+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)