You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ju...@apache.org on 2023/12/25 03:55:21 UTC

(incubator-paimon) branch master updated: [spark] Support hilbert curve sort action in spark (#2549)

This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f9738ced5 [spark] Support hilbert curve sort action in spark (#2549)
f9738ced5 is described below

commit f9738ced5e2e68ebbb9a3b3ad859c2826c324159
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Mon Dec 25 11:55:14 2023 +0800

    [spark] Support hilbert curve sort action in spark (#2549)
---
 docs/content/engines/spark.md                      |   2 +-
 paimon-spark/paimon-spark-common/pom.xml           |   5 +
 .../apache/paimon/spark/sort/HilbertSorter.java    |  66 ++++++
 .../apache/paimon/spark/sort/SparkHilbertUDF.java  | 249 +++++++++++++++++++++
 .../org/apache/paimon/spark/sort/TableSorter.java  |   3 +-
 .../paimon/spark/utils/ConvertBinaryUtil.java      |  68 ++++++
 .../apache/paimon/spark/ConvertBinaryUtilTest.java |  59 +++++
 .../spark/procedure/CompactProcedureTest.scala     |  43 +++-
 8 files changed, 490 insertions(+), 5 deletions(-)

diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index 765644cb7..9d1d04a9d 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -552,7 +552,7 @@ This section introduce all available spark procedures about paimon.
     <tr>
       <td>compact</td>
       <td><nobr>CALL [paimon.]sys.compact(table => '&ltidentifier&gt' [,partitions => '&ltpartitions&gt'] </nobr><br>[, order_strategy =>'&ltsort_type&gt'] [,order_by => '&ltcolumns&gt'])</td>
-      <td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br><br>If you wa [...]
+      <td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
       <td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0',  order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
     </tr>
     </tbody>
diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml
index 9417968ff..30fe7870b 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -53,6 +53,11 @@ under the License.
             <artifactId>scala-compiler</artifactId>
             <version>${scala.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.davidmoten</groupId>
+            <artifactId>hilbert-curve</artifactId>
+            <version>0.2.2</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
new file mode 100644
index 000000000..1f3007713
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sort;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.List;
+
+import static org.apache.spark.sql.functions.array;
+
+/** Sort table by hilbert curve. */
+public class HilbertSorter extends TableSorter {
+
+    private static final String H_COLUMN = "HVALUE";
+
+    public HilbertSorter(FileStoreTable table, List<String> orderColumns) {
+        super(table, orderColumns);
+        checkNotEmpty();
+    }
+
+    @Override
+    public Dataset<Row> sort(Dataset<Row> df) {
+        Column hilbertColumn = hilbertValue(df);
+        Dataset<Row> hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn);
+        Dataset<Row> sortedDF =
+                hilbertValueDF
+                        .repartitionByRange(hilbertValueDF.col(H_COLUMN))
+                        .sortWithinPartitions(hilbertValueDF.col(H_COLUMN));
+        return sortedDF.drop(H_COLUMN);
+    }
+
+    private Column hilbertValue(Dataset<Row> df) {
+        SparkHilbertUDF hilbertUDF = new SparkHilbertUDF();
+
+        Column[] hilbertCols =
+                orderColNames.stream()
+                        .map(df.schema()::apply)
+                        .map(
+                                col ->
+                                        hilbertUDF.sortedLexicographically(
+                                                df.col(col.name()), col.dataType()))
+                        .toArray(Column[]::new);
+
+        return hilbertUDF.transform(array(hilbertCols));
+    }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
new file mode 100644
index 000000000..115974ac0
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sort;
+
+import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.expressions.UserDefinedFunction;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.TimestampType;
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** Spark udf to calculate hilbert bytes. */
+public class SparkHilbertUDF implements Serializable {
+    private static final long PRIMITIVE_EMPTY = Long.MAX_VALUE;
+
+    private static final int BITS_NUM = 63;
+
+    SparkHilbertUDF() {}
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+    }
+
+    byte[] hilbertCurvePosBytes(Seq<Long> points) {
+        List<Long> longs = JavaConverters.seqAsJavaList(points);
+        long[] data = longs.stream().mapToLong(Long::longValue).toArray();
+        HilbertCurve hilbertCurve = HilbertCurve.bits(BITS_NUM).dimensions(points.size());
+        BigInteger index = hilbertCurve.index(data);
+        return ConvertBinaryUtil.paddingToNByte(index.toByteArray(), BITS_NUM);
+    }
+
+    private UserDefinedFunction tinyToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Byte value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return ConvertBinaryUtil.convertBytesToLong(new byte[] {value});
+                                },
+                                DataTypes.LongType)
+                        .withName("TINY_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction shortToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Short value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return (long) value;
+                                },
+                                DataTypes.LongType)
+                        .withName("SHORT_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction intToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Integer value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return (long) value;
+                                },
+                                DataTypes.LongType)
+                        .withName("INT_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction longToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Long value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return value;
+                                },
+                                DataTypes.LongType)
+                        .withName("LONG_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction floatToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Float value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return Double.doubleToLongBits((double) value);
+                                },
+                                DataTypes.LongType)
+                        .withName("FLOAT_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction doubleToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (Double value) -> {
+                                    if (value == null) {
+                                        return PRIMITIVE_EMPTY;
+                                    }
+                                    return Double.doubleToLongBits(value);
+                                },
+                                DataTypes.LongType)
+                        .withName("DOUBLE_ORDERED_BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction booleanToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf((Boolean value) -> value ? PRIMITIVE_EMPTY : 0, DataTypes.LongType)
+                        .withName("BOOLEAN-LEXICAL-BYTES");
+        return udf;
+    }
+
+    private UserDefinedFunction stringToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (String value) -> ConvertBinaryUtil.convertStringToLong(value),
+                                DataTypes.LongType)
+                        .withName("STRING-LEXICAL-BYTES");
+
+        return udf;
+    }
+
+    private UserDefinedFunction bytesTruncateUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf(
+                                (byte[] value) -> ConvertBinaryUtil.convertBytesToLong(value),
+                                DataTypes.LongType)
+                        .withName("BYTE-TRUNCATE");
+
+        return udf;
+    }
+
+    private UserDefinedFunction decimalTypeToOrderedLongUDF() {
+        UserDefinedFunction udf =
+                functions
+                        .udf((BigDecimal value) -> value.longValue(), DataTypes.LongType)
+                        .withName("BYTE-TRUNCATE");
+
+        return udf;
+    }
+
+    @SuppressWarnings("checkstyle:CyclomaticComplexity")
+    public Column sortedLexicographically(Column column, DataType type) {
+        if (type instanceof ByteType) {
+            return tinyToOrderedLongUDF().apply(column);
+        } else if (type instanceof ShortType) {
+            return shortToOrderedLongUDF().apply(column);
+        } else if (type instanceof IntegerType) {
+            return intToOrderedLongUDF().apply(column);
+        } else if (type instanceof LongType) {
+            return longToOrderedLongUDF().apply(column);
+        } else if (type instanceof FloatType) {
+            return floatToOrderedLongUDF().apply(column);
+        } else if (type instanceof DoubleType) {
+            return doubleToOrderedLongUDF().apply(column);
+        } else if (type instanceof StringType) {
+            return stringToOrderedLongUDF().apply(column);
+        } else if (type instanceof BinaryType) {
+            return bytesTruncateUDF().apply(column);
+        } else if (type instanceof BooleanType) {
+            return booleanToOrderedLongUDF().apply(column);
+        } else if (type instanceof TimestampType) {
+            return longToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+        } else if (type instanceof DecimalType) {
+            return decimalTypeToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+        } else if (type instanceof DateType) {
+            return longToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot use column %s of type %s in Hilbert, the type is unsupported",
+                            column, type));
+        }
+    }
+
+    private final UserDefinedFunction hilbertCurveUDF =
+            functions
+                    .udf((Seq<Long> points) -> hilbertCurvePosBytes(points), DataTypes.BinaryType)
+                    .withName("HILBERT_LONG");
+
+    public Column transform(Column arrayBinary) {
+        return hilbertCurveUDF.apply(arrayBinary);
+    }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index b480dc8d4..b76d56a9a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -69,8 +69,7 @@ public abstract class TableSorter {
             case ZORDER:
                 return new ZorderSorter(table, orderColumns);
             case HILBERT:
-                // todo support hilbert curve
-                throw new IllegalArgumentException("Not supported yet.");
+                return new HilbertSorter(table, orderColumns);
             case NONE:
                 return new TableSorter(table, orderColumns) {
                     @Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
new file mode 100644
index 000000000..d2acbf422
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.utils;
+
+import java.nio.charset.StandardCharsets;
+
+/** BinaryUtil util to convert the value to binary. */
+public class ConvertBinaryUtil {
+
+    private ConvertBinaryUtil() {}
+
+    public static byte[] paddingTo8Byte(byte[] data) {
+        return paddingToNByte(data, 8);
+    }
+
+    public static byte[] paddingToNByte(byte[] data, int paddingNum) {
+        if (data.length == paddingNum) {
+            return data;
+        }
+        if (data.length > paddingNum) {
+            byte[] result = new byte[paddingNum];
+            System.arraycopy(data, 0, result, 0, paddingNum);
+            return result;
+        }
+        int paddingSize = paddingNum - data.length;
+        byte[] result = new byte[paddingNum];
+        for (int i = 0; i < paddingSize; i++) {
+            result[i] = 0;
+        }
+        System.arraycopy(data, 0, result, paddingSize, data.length);
+
+        return result;
+    }
+
+    public static byte[] utf8To8Byte(String data) {
+        return paddingTo8Byte(data.getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static Long convertStringToLong(String data) {
+        byte[] bytes = utf8To8Byte(data);
+        return convertBytesToLong(bytes);
+    }
+
+    public static long convertBytesToLong(byte[] bytes) {
+        byte[] paddedBytes = paddingTo8Byte(bytes);
+        long temp = 0L;
+        for (int i = 7; i >= 0; i--) {
+            temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
+        }
+        return temp;
+    }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
new file mode 100644
index 000000000..c97a2fbfb
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertBytesToLong;
+import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertStringToLong;
+
+/** Test for {@link ConvertBinaryUtil}. */
+public class ConvertBinaryUtilTest {
+
+    @Test
+    public void testConvertToLong() {
+        String randomString = generateRandomString();
+        byte[] randomStringBytes = randomString.getBytes(StandardCharsets.UTF_8);
+
+        Long convertStringValue = convertStringToLong(randomString);
+        Long convertBytesValue = convertBytesToLong(randomStringBytes);
+        Assert.assertEquals(convertStringValue, convertBytesValue);
+    }
+
+    public static String generateRandomString() {
+        String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+        Random random = new Random();
+        int length = random.nextInt(100) + 1;
+
+        StringBuilder stringBuilder = new StringBuilder();
+
+        for (int i = 0; i < length; i++) {
+            int index = random.nextInt(characters.length());
+            stringBuilder.append(characters.charAt(index));
+        }
+        return stringBuilder.toString();
+    }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
index 788c8dd3e..986dad42c 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -83,7 +83,6 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
                 "CALL paimon.sys.compact(table => 'T', order_strategy => 'zorder', order_by => 'a,b')"),
               Row(true) :: Nil)
 
-            // test order sort
             val result2 = new util.ArrayList[Row]()
             result2.add(0, Row(0, 0))
             result2.add(1, Row(0, 1))
@@ -97,6 +96,26 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
 
             Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
 
+            // test hilbert sort
+            val result3 = new util.ArrayList[Row]()
+            result3.add(0, Row(0, 0))
+            result3.add(1, Row(0, 1))
+            result3.add(2, Row(1, 1))
+            result3.add(3, Row(1, 0))
+            result3.add(4, Row(2, 0))
+            result3.add(5, Row(2, 1))
+            result3.add(6, Row(2, 2))
+            result3.add(7, Row(1, 2))
+            result3.add(8, Row(0, 2))
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.compact(table => 'T', order_strategy => 'hilbert', order_by => 'a,b')"),
+              Row(true) :: Nil)
+
+            Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
+
+            // test order sort
             checkAnswer(
               spark.sql(
                 "CALL paimon.sys.compact(table => 'T', order_strategy => 'order', order_by => 'a,b')"),
@@ -178,7 +197,6 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
                 "CALL paimon.sys.compact(table => 'T', partitions => 'p=0',  order_strategy => 'zorder', order_by => 'a,b')"),
               Row(true) :: Nil)
 
-            // test order sort
             val result2 = new util.ArrayList[Row]()
             result2.add(0, Row(0, 0, 0))
             result2.add(1, Row(0, 0, 1))
@@ -193,6 +211,27 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
             Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2)
             Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
 
+            // test hilbert sort
+            val result3 = new util.ArrayList[Row]()
+            result3.add(0, Row(0, 0, 0))
+            result3.add(1, Row(0, 0, 1))
+            result3.add(2, Row(0, 1, 1))
+            result3.add(3, Row(0, 1, 0))
+            result3.add(4, Row(0, 2, 0))
+            result3.add(5, Row(0, 2, 1))
+            result3.add(6, Row(0, 2, 2))
+            result3.add(7, Row(0, 1, 2))
+            result3.add(8, Row(0, 0, 2))
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.compact(table => 'T', partitions => 'p=0',  order_strategy => 'hilbert', order_by => 'a,b')"),
+              Row(true) :: Nil)
+
+            Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3)
+            Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+
+            // test order sort
             checkAnswer(
               spark.sql(
                 "CALL paimon.sys.compact(table => 'T', partitions => 'p=0',  order_strategy => 'order', order_by => 'a,b')"),