You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "voon (Jira)" <ji...@apache.org> on 2022/09/14 08:00:00 UTC

[jira] [Created] (HUDI-4841) Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent

voon created HUDI-4841:
--------------------------

             Summary: Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent
                 Key: HUDI-4841
                 URL: https://issues.apache.org/jira/browse/HUDI-4841
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: voon
            Assignee: voon


h1. Description of Bug

CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be sorted by offsets in ascending order. 

 

However, the current comparator implementation does not guarantee that the BlockLocation array is sorted in an ascending order.

 
h1. Stacktrace

 
{code:java}
Caused by: java.lang.IllegalArgumentException: The given offset is not contained in the any block.    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374)    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242)    at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66)    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234)    ... 21 more {code}
 

 
h1. Reproduction of issue
h2. Current sorting implementation

 
{code:java}
Arrays.sort(blocks, new Comparator<BlockLocation>() {
  @Override
  public int compare(BlockLocation o1, BlockLocation o2) {
    long diff = o1.getLength() - o2.getOffset();
    return Long.compare(diff, 0L);
  }
}); {code}
 

 
h2. Test

 
{code:java}
public class TestBlockLocationSort {

  static int compare(org.apache.hadoop.fs.BlockLocation o1, org.apache.hadoop.fs.BlockLocation o2) {
    long diff = o1.getLength() - o2.getOffset();
    return Long.compare(diff, 0L);
  }

  @Test
  void testBlockLocationSort() {
    BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
    BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
    BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);

    BlockLocation[] blocks1 = {o1, o2, o3};
    System.out.println("BlockLocation[] bef. sort [pass 1]: " + Arrays.toString(blocks1));
    Arrays.sort(blocks1, TestBlockLocationSort::compare);
    System.out.println("BlockLocation[] aft. sort [pass 1]: " + Arrays.toString(blocks1) + "\n");

    System.out.println("BlockLocation[] bef. sort [pass 2]: " + Arrays.toString(blocks1));
    Arrays.sort(blocks1, TestBlockLocationSort::compare);
    System.out.println("BlockLocation[] aft. sort [pass 2]: " + Arrays.toString(blocks1) + "\n");
  }

}{code}
 

 
h2. Output

 
{code:java}
BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]

BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
 

 

As can be seen, the current BlockLocation sorting is not idempotent. 

Sorting should be idempotent - Sorting a collection the first time will put it in order, running a sort operation on the same array again should have no impact on the array that is already sorted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)