You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Youjun Yuan (Jira)" <ji...@apache.org> on 2022/04/07 02:58:00 UTC
[jira] [Updated] (HIVE-26111) FULL JOIN returns incorrect result
[ https://issues.apache.org/jira/browse/HIVE-26111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Youjun Yuan updated HIVE-26111:
-------------------------------
Summary: FULL JOIN returns incorrect result (was: FULL JOIN returns incorrect result with Tez engine)
> FULL JOIN returns incorrect result
> ----------------------------------
>
> Key: HIVE-26111
> URL: https://issues.apache.org/jira/browse/HIVE-26111
> Project: Hive
> Issue Type: Bug
> Environment: aws EMR (hive 3.1.2 + Tez 0.10.1)
> Reporter: Youjun Yuan
> Priority: Blocker
>
> we hit a query which FULL JOINs two tables, hive produces incorrect results, for a single value of join key, it produces two records, each record has a valid value for one table and NULL for the other table.
> The query is:
> {code:java}
> SET mapreduce.job.reduces=2;
> SELECT d.id, u.id
> FROM (
> SELECT id
> FROM airflow.tableA rud
> WHERE rud.dt = '2022-04-02-1row'
> ) d
> FULL JOIN (
> SELECT id
> FROM default.tableB
> WHERE dt = '2022-04-01' and device_token='blabla'
> ) u
> ON u.id = d.id
> ; {code}
> According to the job log, the two reducers each get an input record, and output a record.
> And produces two records for id=350570497
> {code:java}
> 350570497 NULL
> NULL 350570497
> Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
> I am sure tableB has only one row where device_token='blabla'
> And we tried:
> 1, SET mapreduce.job.reduces=1; then it produces right result;
> -2, SET hive.execution.engine=mr; then it produces right result;- mr also has the issue.
> 3, JOIN (instead of FULL JOIN) worked as expected
> 4, in sub query u, change filter device_token='blabla' to id=350570497, it worked ok
> Below is the explain output of the query:
> {code:java}
> Plan optimized by CBO.Vertex dependency in root stage
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
> Fetch Operator
> limit:-1
> Stage-1
> Reducer 3
> File Output Operator [FS_10]
> Map Join Operator [MAPJOIN_13] (rows=2 width=8)
> Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
> <-Map 1 [CUSTOM_SIMPLE_EDGE]
> PARTITION_ONLY_SHUFFLE [RS_6]
> PartitionCols:_col0
> Select Operator [SEL_2] (rows=1 width=4)
> Output:["_col0"]
> TableScan [TS_0] (rows=1 width=4)
> airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
> <-Map 2 [CUSTOM_SIMPLE_EDGE]
> PARTITION_ONLY_SHUFFLE [RS_7]
> PartitionCols:_col0
> Select Operator [SEL_5] (rows=1 width=4)
> Output:["_col0"]
> Filter Operator [FIL_12] (rows=1 width=110)
> predicate:(device_token = 'blabla')
> TableScan [TS_3] (rows=215192362 width=109)
> default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"] {code}
> I can't generate a small enough result set to reproduce the issue, I have minimized the tableA to only 1 row, tableB has ~200m rows, but if I further reduce the size of tableB, then the issue can't be reproduced.
> Any suggestion would be highly appreciated, regarding the root cause of the issue, how to work around it, or how to reproduce it with small enough dataset.
>
> below is the log found in hive.log
> {code:java}
> 220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
> Stage-1 is a root stage [MAPRED]
> Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
> Stage: Stage-1
> Tez
> DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
> Edges:
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
> DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: rud
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
> GatherStats: false
> Select Operator
> expressions: id (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> null sort order: a
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
> tag: 0
> auto parallelism: true
> Path -> Alias:
> s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
> Path -> Partition:
> s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
> Partition
> base file name: hh=00
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> partition values:
> dt 2022-04-02-1row
> hh 00
> properties:
> COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
> bucket_count -1
> column.name.delimiter ,
> columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
> columns.comments
> columns.types int:string:string:string:int:int:string:string:string:int:int
> field.delim
> file.inputformat org.apache.hadoop.mapred.TextInputFormat
> file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> line.delim location s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
> name airflow.rds_users_delta
> numFiles 1
> numRows 1
> partition_columns dt/hh
> partition_columns.types string:string
> rawDataSize 2507
> serialization.ddl struct rds_users_delta { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
> serialization.format
> serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> totalSize 888
> transient_lastDdlTime 1649039842
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat
> output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> properties:
> EXTERNAL TRUE
> bucket_count -1
> bucketing_version 2
> column.name.delimiter ,
> columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
> columns.comments
> columns.types int:string:string:string:int:int:string:string:string:int:int
> field.delim
> file.inputformat org.apache.hadoop.mapred.TextInputFormat
> file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> line.delim location s3a://.../tmp/rds_users_delta
> name airflow.rds_users_delta
> partition_columns dt/hh
> partition_columns.types string:string
> serialization.ddl struct rds_users_delta { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
> serialization.format
> serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> transient_lastDdlTime 1648974877
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> name: airflow.rds_users_delta
> name: airflow.rds_users_delta
> Truncated Path -> Alias:
> s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
> Map 2
> Map Operator Tree:
> TableScan
> alias: users
> Statistics: Num rows: 215192362 Data size: 23671159812 Basic stats: COMPLETE Column stats: COMPLETE
> GatherStats: false
> Filter Operator
> isSamplingPred: false
> predicate: (device_token = 'blabla') (type: boolean)
> Statistics: Num rows: 1 Data size: 110 Basic stats: COMPLETE Column stats: COMPLETE
> Select Operator
> expressions: id (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> null sort order: a
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
> tag: 1
> auto parallelism: true
> Path -> Alias:
> s3://.../default/users/dt=2022-04-01 [users]
> Path -> Partition:
> s3://.../default/users/dt=2022-04-01
> Partition
> base file name: dt=2022-04-01
> input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
> output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> partition values:
> dt 2022-04-01
> properties:
> COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
> bucket_count -1
> column.name.delimiter ,
> columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
> columns.comments
> columns.types int:string:string:string:int:int:string:string:string:int:int:string:string
> file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
> file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> location s3://.../default/users/dt=2022-04-01
> name default.users
> numFiles 256
> numRows 215192362
> partition_columns dt
> partition_columns.types string
> rawDataSize 389210158991
> serialization.ddl struct users { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string virtual_id, string account_id}
> serialization.format 1
> serialization.lib org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> totalSize 50190916884
> transient_lastDdlTime 1648905779
> serde: org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
> output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> properties:
> EXTERNAL TRUE
> bucket_count -1
> column.name.delimiter ,
> columns id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
> columns.comments
> columns.types int:string:string:string:int:int:string:string:string:int:int:string:string
> file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
> file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> last_modified_by hadoop
> last_modified_time 1629880363
> location s3://.../default/users
> name default.users
> partition_columns dt
> partition_columns.types string
> serialization.ddl struct users { i32 id, string device_token, string code, string push_token, i32 creation_timestamp, i32 update_timestamp, string general_configuration, string server_configuration, string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string virtual_id, string account_id}
> serialization.format 1
> serialization.lib org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> transient_lastDdlTime 1629880363
> serde: org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> name: default.users
> name: default.users
> Truncated Path -> Alias:
> /users/dt=2022-04-01 [users]
> Reducer 3
> Needs Tagging: false
> Reduce Operator Tree:
> Map Join Operator
> condition map:
> Full Outer Join 0 to 1
> Estimated key counts: Map 1 => 1
> keys:
> 0 KEY.reducesinkkey0 (type: int)
> 1 KEY.reducesinkkey0 (type: int)
> outputColumnNames: _col0, _col1
> input vertices:
> 0 Map 1
> Position of Big Table: 1
> Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
> DynamicPartitionHashJoin: true
> File Output Operator
> compressed: true
> GlobalTableId: 0
> directory: hdfs://.../-ext-10002
> NumFilesPerFileSink: 1
> Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
> Stats Publishing Key Prefix: hdfs://.../-ext-10002/
> table:
> input format: org.apache.hadoop.mapred.SequenceFileInputFormat
> output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> properties:
> columns _col0,_col1
> columns.types int:int
> escape.delim \
> hive.serialization.extend.additional.nesting.levels true
> serialization.escape.crlf true
> serialization.format 1
> serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> TotalFiles: 1
> GatherStats: false
> MultiFileSpray: false {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)