You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/17 07:39:05 UTC
[hudi] branch master updated: [HUDI-4841] Fix sort idempotency issue (#6669)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f1caa3a2f9 [HUDI-4841] Fix sort idempotency issue (#6669)
f1caa3a2f9 is described below
commit f1caa3a2f9278ca00d64266dc9d3748583b6a42f
Author: voonhous <vo...@gmail.com>
AuthorDate: Sat Sep 17 15:38:58 2022 +0800
[HUDI-4841] Fix sort idempotency issue (#6669)
---
.../table/format/cow/CopyOnWriteInputFormat.java | 11 ++---
.../table/format/cow/TestBlockLocationSort.java | 52 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 8 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index a36f9c914c..c5ea3d4ab9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format.cow;
+import java.util.Comparator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.util.DataTypeUtils;
@@ -42,7 +43,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -220,13 +220,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
// get the block locations and make sure they are in order with respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
- Arrays.sort(blocks, new Comparator<BlockLocation>() {
- @Override
- public int compare(BlockLocation o1, BlockLocation o2) {
- long diff = o1.getLength() - o2.getOffset();
- return Long.compare(diff, 0L);
- }
- });
+ Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
long bytesUnassigned = len;
long position = 0;
@@ -399,4 +393,5 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
return null;
}
}
+
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java
new file mode 100644
index 0000000000..d868dce4d9
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.hadoop.fs.BlockLocation;
+import org.junit.jupiter.api.Test;
+
+public class TestBlockLocationSort {
+
+ private static BlockLocation createBlockLocation(int offset, int length) {
+ return new BlockLocation(new String[0], new String[0], offset, length);
+ }
+
+ @Test
+ void testBlockLocationSort() {
+ BlockLocation o1 = createBlockLocation(0, 5);
+ BlockLocation o2 = createBlockLocation(6, 4);
+ BlockLocation o3 = createBlockLocation(5, 5);
+
+ BlockLocation[] blocks = {o1, o2, o3};
+ BlockLocation[] sortedBlocks = {o1, o3, o2};
+
+ Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
+ assertThat(blocks, equalTo(sortedBlocks));
+
+ // Sort again to ensure idempotency
+ Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
+ assertThat(blocks, equalTo(sortedBlocks));
+ }
+
+}