You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/09/14 08:05:00 UTC

[jira] [Updated] (HUDI-4841) Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent

     [ https://issues.apache.org/jira/browse/HUDI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated HUDI-4841:
---------------------------------
    Labels: pull-request-available  (was: )

> Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent
> -------------------------------------------------------------------------------------------
>
>                 Key: HUDI-4841
>                 URL: https://issues.apache.org/jira/browse/HUDI-4841
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Description of Bug
> CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be sorted by offsets in ascending order. 
>  
> However, the current comparator implementation does not guarantee that the BlockLocation array is sorted in an ascending order.
>  
> h1. Stacktrace
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: The given offset is not contained in the any block.    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374)    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242)    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66)    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234)    ... 21 more {code}
>  
>  
> h1. Reproduction of issue
> h2. Current sorting implementation
>  
> {code:java}
> Arrays.sort(blocks, new Comparator<BlockLocation>() {
>   @Override
>   public int compare(BlockLocation o1, BlockLocation o2) {
>     long diff = o1.getLength() - o2.getOffset();
>     return Long.compare(diff, 0L);
>   }
> }); {code}
>  
>  
> h2. Test
>  
> {code:java}
> public class TestBlockLocationSort {
>   static int compare(org.apache.hadoop.fs.BlockLocation o1, org.apache.hadoop.fs.BlockLocation o2) {
>     long diff = o1.getLength() - o2.getOffset();
>     return Long.compare(diff, 0L);
>   }
>   @Test
>   void testBlockLocationSort() {
>     BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
>     BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
>     BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);
>     BlockLocation[] blocks1 = {o1, o2, o3};
>     System.out.println("BlockLocation[] bef. sort [pass 1]: " + Arrays.toString(blocks1));
>     Arrays.sort(blocks1, TestBlockLocationSort::compare);
>     System.out.println("BlockLocation[] aft. sort [pass 1]: " + Arrays.toString(blocks1) + "\n");
>     System.out.println("BlockLocation[] bef. sort [pass 2]: " + Arrays.toString(blocks1));
>     Arrays.sort(blocks1, TestBlockLocationSort::compare);
>     System.out.println("BlockLocation[] aft. sort [pass 2]: " + Arrays.toString(blocks1) + "\n");
>   }
> }{code}
>  
>  
> h2. Output
>  
> {code:java}
> BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
> BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]
> BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
> BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
>  
>  
> As can be seen, the current BlockLocation sorting is not idempotent. 
> Sorting should be idempotent - Sorting a collection the first time will put it in order, running a sort operation on the same array again should have no impact on the array that is already sorted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)