You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "voon (Jira)" <ji...@apache.org> on 2022/09/14 08:00:00 UTC
[jira] [Created] (HUDI-4841) Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent
voon created HUDI-4841:
--------------------------
Summary: Flink read issue; BlockLocations not sorted properly; Sort implementation is not idempotent
Key: HUDI-4841
URL: https://issues.apache.org/jira/browse/HUDI-4841
Project: Apache Hudi
Issue Type: Bug
Reporter: voon
Assignee: voon
h1. Description of Bug
CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be sorted by offsets in ascending order.
However, the current comparator implementation does not guarantee that the BlockLocation array is sorted in an ascending order.
h1. Stacktrace
{code:java}
Caused by: java.lang.IllegalArgumentException: The given offset is not contained in the any block. at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234) ... 21 more {code}
h1. Reproduction of issue
h2. Current sorting implementation
{code:java}
Arrays.sort(blocks, new Comparator<BlockLocation>() {
@Override
public int compare(BlockLocation o1, BlockLocation o2) {
long diff = o1.getLength() - o2.getOffset();
return Long.compare(diff, 0L);
}
}); {code}
h2. Test
{code:java}
public class TestBlockLocationSort {
static int compare(org.apache.hadoop.fs.BlockLocation o1, org.apache.hadoop.fs.BlockLocation o2) {
long diff = o1.getLength() - o2.getOffset();
return Long.compare(diff, 0L);
}
@Test
void testBlockLocationSort() {
BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);
BlockLocation[] blocks1 = {o1, o2, o3};
System.out.println("BlockLocation[] bef. sort [pass 1]: " + Arrays.toString(blocks1));
Arrays.sort(blocks1, TestBlockLocationSort::compare);
System.out.println("BlockLocation[] aft. sort [pass 1]: " + Arrays.toString(blocks1) + "\n");
System.out.println("BlockLocation[] bef. sort [pass 2]: " + Arrays.toString(blocks1));
Arrays.sort(blocks1, TestBlockLocationSort::compare);
System.out.println("BlockLocation[] aft. sort [pass 2]: " + Arrays.toString(blocks1) + "\n");
}
}{code}
h2. Output
{code:java}
BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]
BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
As can be seen, the current BlockLocation sorting is not idempotent.
Sorting should be idempotent - Sorting a collection the first time will put it in order, running a sort operation on the same array again should have no impact on the array that is already sorted.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)