You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Youjun Yuan (Jira)" <ji...@apache.org> on 2022/04/07 09:31:00 UTC

[jira] [Commented] (HIVE-26111) FULL JOIN returns incorrect result

    [ https://issues.apache.org/jira/browse/HIVE-26111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518731#comment-17518731 ] 

Youjun Yuan commented on HIVE-26111:
------------------------------------

should a duplication of https://issues.apache.org/jira/browse/HIVE-22098, the bucketing_version issue.

> FULL JOIN returns incorrect result
> ----------------------------------
>
>                 Key: HIVE-26111
>                 URL: https://issues.apache.org/jira/browse/HIVE-26111
>             Project: Hive
>          Issue Type: Bug
>         Environment: aws EMR (hive 3.1.2 + Tez 0.10.1)
>            Reporter: Youjun Yuan
>            Priority: Blocker
>
> we hit a query which FULL JOINs two tables, hive produces incorrect results, for a single value of join key, it produces two records, each record has a valid value for one table and NULL for the other table.
> The query is:
> {code:java}
> SET mapreduce.job.reduces=2;
> SELECT d.id, u.id
> FROM (
>        SELECT id
>        FROM   airflow.tableA rud
>        WHERE  rud.dt = '2022-04-02-1row'
> ) d
> FULL JOIN (
>        SELECT id
>        FROM   default.tableB
>        WHERE  dt = '2022-04-01' and device_token='blabla'
>  ) u
> ON u.id = d.id
> ; {code}
> According to the job log, the two reducers each get an input record, and output a record.
> And produces two records for id=350570497
> {code:java}
> 350570497    NULL
> NULL    350570497
> Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
> I am sure tableB has only one row where device_token='blabla'
> And we tried:
> 1, SET mapreduce.job.reduces=1; then it produces right result;
> -2, SET hive.execution.engine=mr; then it produces right result;- mr also has the issue.
> 3, JOIN (instead of FULL JOIN) worked as expected
> 4, in sub query u, change filter device_token='blabla' to id=350570497, it worked ok
> 5, flatten the sub queries, then it works ok, like below:
> {code:java}
> SELECT  d.id, u.id 
> from airflow.rds_users_delta d full join default.users u
> on (u.id = d.id)
> where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and u.device_token='blabla' {code}
> Below is the explain output of the query:
> {code:java}
> Plan optimized by CBO.Vertex dependency in root stage
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
>   Fetch Operator
>     limit:-1
>     Stage-1
>       Reducer 3
>       File Output Operator [FS_10]
>         Map Join Operator [MAPJOIN_13] (rows=2 width=8)
>           Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
>         <-Map 1 [CUSTOM_SIMPLE_EDGE]
>           PARTITION_ONLY_SHUFFLE [RS_6]
>             PartitionCols:_col0
>             Select Operator [SEL_2] (rows=1 width=4)
>               Output:["_col0"]
>               TableScan [TS_0] (rows=1 width=4)
>                 airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
>         <-Map 2 [CUSTOM_SIMPLE_EDGE]
>           PARTITION_ONLY_SHUFFLE [RS_7]
>             PartitionCols:_col0
>             Select Operator [SEL_5] (rows=1 width=4)
>               Output:["_col0"]
>               Filter Operator [FIL_12] (rows=1 width=110)
>                 predicate:(device_token = 'blabla')
>                 TableScan [TS_3] (rows=215192362 width=109)
>                   default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]  {code}
> I can't generate a small enough result set to reproduce the issue, I have minimized the tableA to only 1 row, tableB has ~200m rows, but if I further reduce the size of tableB, then the issue can't be reproduced.
> Any suggestion would be highly appreciated, regarding the root cause of the issue, how to work around it, or how to reproduce it with small enough dataset. 
>  
> below is the log found in hive.log
> {code:java}
> 220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
>   Stage-1 is a root stage [MAPRED]
>   Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
>   Stage: Stage-1
>     Tez
>       DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
>       Edges:
>         Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
>       DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
>       Vertices:
>         Map 1
>             Map Operator Tree:
>                 TableScan
>                   alias: rud
>                   Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
>                   GatherStats: false
>                   Select Operator
>                     expressions: id (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
>                     Reduce Output Operator
>                       key expressions: _col0 (type: int)
>                       null sort order: a
>                       sort order: +
>                       Map-reduce partition columns: _col0 (type: int)
>                       Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
>                       tag: 0
>                       auto parallelism: true
>             Path -> Alias:
>               s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
>             Path -> Partition:
>               s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
>                 Partition
>                   base file name: hh=00
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                   partition values:
>                     dt 2022-04-02-1row
>                     hh 00
>                   properties:
>                     COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
>                     bucket_count -1
>                     column.name.delimiter ,
>                     columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
>                     columns.comments
>                     columns.types int:string:string:string:int:int:string:string:string:int:int
>                     field.delim
>                     file.inputformat org.apache.hadoop.mapred.TextInputFormat
>                     file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     line.delim                    location s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
>                     name airflow.rds_users_delta
>                     numFiles 1
>                     numRows 1
>                     partition_columns dt/hh
>                     partition_columns.types string:string
>                     rawDataSize 2507
>                     serialization.ddl struct rds_users_delta { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
>                     serialization.format
>                     serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     totalSize 888
>                     transient_lastDdlTime 1649039842
>                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe                    input format: org.apache.hadoop.mapred.TextInputFormat
>                     output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     properties:
>                       EXTERNAL TRUE
>                       bucket_count -1
>                       bucketing_version 2
>                       column.name.delimiter ,
>                       columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
>                       columns.comments
>                       columns.types int:string:string:string:int:int:string:string:string:int:int
>                       field.delim
>                       file.inputformat org.apache.hadoop.mapred.TextInputFormat
>                       file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                       line.delim                      location s3a://.../tmp/rds_users_delta
>                       name airflow.rds_users_delta
>                       partition_columns dt/hh
>                       partition_columns.types string:string
>                       serialization.ddl struct rds_users_delta { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
>                       serialization.format
>                       serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       transient_lastDdlTime 1648974877
>                     serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     name: airflow.rds_users_delta
>                   name: airflow.rds_users_delta
>             Truncated Path -> Alias:
>               s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
>         Map 2
>             Map Operator Tree:
>                 TableScan
>                   alias: users
>                   Statistics: Num rows: 215192362 Data size: 23671159812 Basic stats: COMPLETE Column stats: COMPLETE
>                   GatherStats: false
>                   Filter Operator
>                     isSamplingPred: false
>                     predicate: (device_token = 'blabla') (type: boolean)
>                     Statistics: Num rows: 1 Data size: 110 Basic stats: COMPLETE Column stats: COMPLETE
>                     Select Operator
>                       expressions: id (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
>                       Reduce Output Operator
>                         key expressions: _col0 (type: int)
>                         null sort order: a
>                         sort order: +
>                         Map-reduce partition columns: _col0 (type: int)
>                         Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
>                         tag: 1
>                         auto parallelism: true
>             Path -> Alias:
>               s3://.../default/users/dt=2022-04-01 [users]
>             Path -> Partition:
>               s3://.../default/users/dt=2022-04-01
>                 Partition
>                   base file name: dt=2022-04-01
>                   input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                   output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                   partition values:
>                     dt 2022-04-01
>                   properties:
>                     COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
>                     bucket_count -1
>                     column.name.delimiter ,
>                     columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
>                     columns.comments
>                     columns.types int:string:string:string:int:int:string:string:string:int:int:string:string
>                     file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                     file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                     location s3://.../default/users/dt=2022-04-01
>                     name default.users
>                     numFiles 256
>                     numRows 215192362
>                     partition_columns dt
>                     partition_columns.types string
>                     rawDataSize 389210158991
>                     serialization.ddl struct users { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string virtual_id, string account_id}
>                     serialization.format 1
>                     serialization.lib org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                     totalSize 50190916884
>                     transient_lastDdlTime 1648905779
>                   serde: org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe                    input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                     output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                     properties:
>                       EXTERNAL TRUE
>                       bucket_count -1
>                       column.name.delimiter ,
>                       columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
>                       columns.comments
>                       columns.types int:string:string:string:int:int:string:string:string:int:int:string:string
>                       file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                       file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                       last_modified_by hadoop
>                       last_modified_time 1629880363
>                       location s3://.../default/users
>                       name default.users
>                       partition_columns dt
>                       partition_columns.types string
>                       serialization.ddl struct users { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string virtual_id, string account_id}
>                       serialization.format 1
>                       serialization.lib org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                       transient_lastDdlTime 1629880363
>                     serde: org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                     name: default.users
>                   name: default.users
>             Truncated Path -> Alias:
>               /users/dt=2022-04-01 [users]
>         Reducer 3
>             Needs Tagging: false
>             Reduce Operator Tree:
>               Map Join Operator
>                 condition map:
>                      Full Outer Join 0 to 1
>                 Estimated key counts: Map 1 => 1
>                 keys:
>                   0 KEY.reducesinkkey0 (type: int)
>                   1 KEY.reducesinkkey0 (type: int)
>                 outputColumnNames: _col0, _col1
>                 input vertices:
>                   0 Map 1
>                 Position of Big Table: 1
>                 Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
>                 DynamicPartitionHashJoin: true
>                 File Output Operator
>                   compressed: true
>                   GlobalTableId: 0
>                   directory: hdfs://.../-ext-10002
>                   NumFilesPerFileSink: 1
>                   Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
>                   Stats Publishing Key Prefix: hdfs://.../-ext-10002/
>                   table:
>                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
>                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                       properties:
>                         columns _col0,_col1
>                         columns.types int:int
>                         escape.delim \
>                         hive.serialization.extend.additional.nesting.levels true
>                         serialization.escape.crlf true
>                         serialization.format 1
>                         serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                   TotalFiles: 1
>                   GatherStats: false
>                   MultiFileSpray: false {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)