You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/17 07:39:05 UTC

[hudi] branch master updated: [HUDI-4841] Fix sort idempotency issue (#6669)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1caa3a2f9 [HUDI-4841] Fix sort idempotency issue (#6669)
f1caa3a2f9 is described below

commit f1caa3a2f9278ca00d64266dc9d3748583b6a42f
Author: voonhous <vo...@gmail.com>
AuthorDate: Sat Sep 17 15:38:58 2022 +0800

    [HUDI-4841] Fix sort idempotency issue (#6669)
---
 .../table/format/cow/CopyOnWriteInputFormat.java   | 11 ++---
 .../table/format/cow/TestBlockLocationSort.java    | 52 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index a36f9c914c..c5ea3d4ab9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format.cow;
 
+import java.util.Comparator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
 import org.apache.hudi.util.DataTypeUtils;
@@ -42,7 +43,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -220,13 +220,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
 
         // get the block locations and make sure they are in order with respect to their offset
         final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
-        Arrays.sort(blocks, new Comparator<BlockLocation>() {
-          @Override
-          public int compare(BlockLocation o1, BlockLocation o2) {
-            long diff = o1.getLength() - o2.getOffset();
-            return Long.compare(diff, 0L);
-          }
-        });
+        Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
 
         long bytesUnassigned = len;
         long position = 0;
@@ -399,4 +393,5 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
       return null;
     }
   }
+
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java
new file mode 100644
index 0000000000..d868dce4d9
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.hadoop.fs.BlockLocation;
+import org.junit.jupiter.api.Test;
+
+public class TestBlockLocationSort {
+
+  private static BlockLocation createBlockLocation(int offset, int length) {
+    return new BlockLocation(new String[0], new String[0], offset, length);
+  }
+
+  @Test
+  void testBlockLocationSort() {
+    BlockLocation o1 = createBlockLocation(0, 5);
+    BlockLocation o2 = createBlockLocation(6, 4);
+    BlockLocation o3 = createBlockLocation(5, 5);
+
+    BlockLocation[] blocks = {o1, o2, o3};
+    BlockLocation[] sortedBlocks = {o1, o3, o2};
+
+    Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
+    assertThat(blocks, equalTo(sortedBlocks));
+
+    // Sort again to ensure idempotency
+    Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
+    assertThat(blocks, equalTo(sortedBlocks));
+  }
+
+}