You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Marta Kuczora (Jira)" <ji...@apache.org> on 2020/08/04 10:51:00 UTC

[jira] [Commented] (HIVE-23763) Query based minor compaction produces wrong files when rows with different buckets Ids are processed by the same FileSinkOperator

    [ https://issues.apache.org/jira/browse/HIVE-23763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170714#comment-17170714 ] 

Marta Kuczora commented on HIVE-23763:
--------------------------------------

Some details about the fix:

The insert query which is running by the MINOR compaction uses the reducers and not just the mappers like the query of the MAJOR compaction. Also the temp table the query insert into is clustered by bucket number and sorted bu bucket number, original transaction and row Id. Because of these, even though the split groups are created correctly per buckets, the rows are not always distributed correctly between the reducers. It can happen that one reducer and so one FileSinkOperator gets rows from the same bucket and then writes them into one file which will result a corrupted file. We cannot always prevent to have rows from multiple buckets in one FileSinkOperator, for example if the reducer number is smaller than the table's bucket number. So the FileSinkOperator got extended in this patch to be able to handle rows from multiple buckets. It is similar what the createDynamicBuckets method does for delete deltas.
The other part of the patch is to make sure that rows from the same bucket would always get to the same FileSinkOperator. Therefore the ReduceSinkOperator got extended to use the bucket number when distributing the rows.

 

> Query based minor compaction produces wrong files when rows with different buckets Ids are processed by the same FileSinkOperator
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-23763
>                 URL: https://issues.apache.org/jira/browse/HIVE-23763
>             Project: Hive
>          Issue Type: Bug
>          Components: Transactions
>    Affects Versions: 4.0.0
>            Reporter: Marta Kuczora
>            Assignee: Marta Kuczora
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> How to reproduce:
> - Create an unbucketed ACID table
> - Insert a bigger amount of data into this table so there would be multiple bucket files in the table
> The files in the table should look like this:
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00000_0
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00001_0
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00002_0
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00003_0
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00004_0
> /warehouse/tablespace/managed/hive/bubu_acid/delta_0000001_0000001_0000/bucket_00005_0
> - Do some delete on rows with different bucket Ids
> The files in a delete delta should look like this:
> /warehouse/tablespace/managed/hive/bubu_acid/delete_delta_0000002_0000002_0000/bucket_00000
> /warehouse/tablespace/managed/hive/bubu_acid/delete_delta_0000006_0000006_0000/bucket_00003
> /warehouse/tablespace/managed/hive/bubu_acid/delete_delta_0000006_0000006_0000/bucket_00001
> - Run the query-based minor compaction
> - After the compaction the newly created delete delta containes only 1 bucket file. This file contains rows from all buckets and the table becomes unusable
> /warehouse/tablespace/managed/hive/bubu_acid/delete_delta_0000001_0000007_v0000066/bucket_00000
> The issue happens only if rows with different bucket Ids are processed by the same FileSinkOperator. 
> In the FileSinkOperator.process method, the files for the compaction table are created like this:
> {noformat}
>     if (!bDynParts && !filesCreated) {
>       if (lbDirName != null) {
>         if (valToPaths.get(lbDirName) == null) {
>           createNewPaths(null, lbDirName);
>         }
>       } else {
>         if (conf.isCompactionTable()) {
>           int bucketProperty = getBucketProperty(row);
>           bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
>         }
>         createBucketFiles(fsp);
>       }
>     }
> {noformat}
> When the first row is processed, the file is created and then the filesCreated variable is set to true. Then when the other rows are processed, the first if statement will be false, so no new file gets created, but the row will be written into the file created for the first row.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)