You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ju...@apache.org on 2023/12/25 03:55:21 UTC
(incubator-paimon) branch master updated: [spark] Support hilbert curve sort action in spark (#2549)
This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9738ced5 [spark] Support hilbert curve sort action in spark (#2549)
f9738ced5 is described below
commit f9738ced5e2e68ebbb9a3b3ad859c2826c324159
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Mon Dec 25 11:55:14 2023 +0800
[spark] Support hilbert curve sort action in spark (#2549)
---
docs/content/engines/spark.md | 2 +-
paimon-spark/paimon-spark-common/pom.xml | 5 +
.../apache/paimon/spark/sort/HilbertSorter.java | 66 ++++++
.../apache/paimon/spark/sort/SparkHilbertUDF.java | 249 +++++++++++++++++++++
.../org/apache/paimon/spark/sort/TableSorter.java | 3 +-
.../paimon/spark/utils/ConvertBinaryUtil.java | 68 ++++++
.../apache/paimon/spark/ConvertBinaryUtilTest.java | 59 +++++
.../spark/procedure/CompactProcedureTest.scala | 43 +++-
8 files changed, 490 insertions(+), 5 deletions(-)
diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index 765644cb7..9d1d04a9d 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -552,7 +552,7 @@ This section introduce all available spark procedures about paimon.
<tr>
<td>compact</td>
<td><nobr>CALL [paimon.]sys.compact(table => '<identifier>' [,partitions => '<partitions>'] </nobr><br>[, order_strategy =>'<sort_type>'] [,order_by => '<columns>'])</td>
- <td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br><br>If you wa [...]
+ <td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
<td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
</tr>
</tbody>
diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml
index 9417968ff..30fe7870b 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -53,6 +53,11 @@ under the License.
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.davidmoten</groupId>
+ <artifactId>hilbert-curve</artifactId>
+ <version>0.2.2</version>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
new file mode 100644
index 000000000..1f3007713
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sort;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.List;
+
+import static org.apache.spark.sql.functions.array;
+
+/** Sort table by hilbert curve. */
+public class HilbertSorter extends TableSorter {
+
+ private static final String H_COLUMN = "HVALUE";
+
+ public HilbertSorter(FileStoreTable table, List<String> orderColumns) {
+ super(table, orderColumns);
+ checkNotEmpty();
+ }
+
+ @Override
+ public Dataset<Row> sort(Dataset<Row> df) {
+ Column hilbertColumn = hilbertValue(df);
+ Dataset<Row> hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn);
+ Dataset<Row> sortedDF =
+ hilbertValueDF
+ .repartitionByRange(hilbertValueDF.col(H_COLUMN))
+ .sortWithinPartitions(hilbertValueDF.col(H_COLUMN));
+ return sortedDF.drop(H_COLUMN);
+ }
+
+ private Column hilbertValue(Dataset<Row> df) {
+ SparkHilbertUDF hilbertUDF = new SparkHilbertUDF();
+
+ Column[] hilbertCols =
+ orderColNames.stream()
+ .map(df.schema()::apply)
+ .map(
+ col ->
+ hilbertUDF.sortedLexicographically(
+ df.col(col.name()), col.dataType()))
+ .toArray(Column[]::new);
+
+ return hilbertUDF.transform(array(hilbertCols));
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
new file mode 100644
index 000000000..115974ac0
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sort;
+
+import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.expressions.UserDefinedFunction;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.TimestampType;
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** Spark udf to calculate hilbert bytes. */
+public class SparkHilbertUDF implements Serializable {
+ private static final long PRIMITIVE_EMPTY = Long.MAX_VALUE;
+
+ private static final int BITS_NUM = 63;
+
+ SparkHilbertUDF() {}
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ byte[] hilbertCurvePosBytes(Seq<Long> points) {
+ List<Long> longs = JavaConverters.seqAsJavaList(points);
+ long[] data = longs.stream().mapToLong(Long::longValue).toArray();
+ HilbertCurve hilbertCurve = HilbertCurve.bits(BITS_NUM).dimensions(points.size());
+ BigInteger index = hilbertCurve.index(data);
+ return ConvertBinaryUtil.paddingToNByte(index.toByteArray(), BITS_NUM);
+ }
+
+ private UserDefinedFunction tinyToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Byte value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return ConvertBinaryUtil.convertBytesToLong(new byte[] {value});
+ },
+ DataTypes.LongType)
+ .withName("TINY_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction shortToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Short value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return (long) value;
+ },
+ DataTypes.LongType)
+ .withName("SHORT_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction intToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Integer value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return (long) value;
+ },
+ DataTypes.LongType)
+ .withName("INT_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction longToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Long value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return value;
+ },
+ DataTypes.LongType)
+ .withName("LONG_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction floatToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Float value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return Double.doubleToLongBits((double) value);
+ },
+ DataTypes.LongType)
+ .withName("FLOAT_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction doubleToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (Double value) -> {
+ if (value == null) {
+ return PRIMITIVE_EMPTY;
+ }
+ return Double.doubleToLongBits(value);
+ },
+ DataTypes.LongType)
+ .withName("DOUBLE_ORDERED_BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction booleanToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf((Boolean value) -> value ? PRIMITIVE_EMPTY : 0, DataTypes.LongType)
+ .withName("BOOLEAN-LEXICAL-BYTES");
+ return udf;
+ }
+
+ private UserDefinedFunction stringToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (String value) -> ConvertBinaryUtil.convertStringToLong(value),
+ DataTypes.LongType)
+ .withName("STRING-LEXICAL-BYTES");
+
+ return udf;
+ }
+
+ private UserDefinedFunction bytesTruncateUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf(
+ (byte[] value) -> ConvertBinaryUtil.convertBytesToLong(value),
+ DataTypes.LongType)
+ .withName("BYTE-TRUNCATE");
+
+ return udf;
+ }
+
+ private UserDefinedFunction decimalTypeToOrderedLongUDF() {
+ UserDefinedFunction udf =
+ functions
+ .udf((BigDecimal value) -> value.longValue(), DataTypes.LongType)
+ .withName("BYTE-TRUNCATE");
+
+ return udf;
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public Column sortedLexicographically(Column column, DataType type) {
+ if (type instanceof ByteType) {
+ return tinyToOrderedLongUDF().apply(column);
+ } else if (type instanceof ShortType) {
+ return shortToOrderedLongUDF().apply(column);
+ } else if (type instanceof IntegerType) {
+ return intToOrderedLongUDF().apply(column);
+ } else if (type instanceof LongType) {
+ return longToOrderedLongUDF().apply(column);
+ } else if (type instanceof FloatType) {
+ return floatToOrderedLongUDF().apply(column);
+ } else if (type instanceof DoubleType) {
+ return doubleToOrderedLongUDF().apply(column);
+ } else if (type instanceof StringType) {
+ return stringToOrderedLongUDF().apply(column);
+ } else if (type instanceof BinaryType) {
+ return bytesTruncateUDF().apply(column);
+ } else if (type instanceof BooleanType) {
+ return booleanToOrderedLongUDF().apply(column);
+ } else if (type instanceof TimestampType) {
+ return longToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+ } else if (type instanceof DecimalType) {
+ return decimalTypeToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+ } else if (type instanceof DateType) {
+ return longToOrderedLongUDF().apply(column.cast(DataTypes.LongType));
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot use column %s of type %s in Hilbert, the type is unsupported",
+ column, type));
+ }
+ }
+
+ private final UserDefinedFunction hilbertCurveUDF =
+ functions
+ .udf((Seq<Long> points) -> hilbertCurvePosBytes(points), DataTypes.BinaryType)
+ .withName("HILBERT_LONG");
+
+ public Column transform(Column arrayBinary) {
+ return hilbertCurveUDF.apply(arrayBinary);
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index b480dc8d4..b76d56a9a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -69,8 +69,7 @@ public abstract class TableSorter {
case ZORDER:
return new ZorderSorter(table, orderColumns);
case HILBERT:
- // todo support hilbert curve
- throw new IllegalArgumentException("Not supported yet.");
+ return new HilbertSorter(table, orderColumns);
case NONE:
return new TableSorter(table, orderColumns) {
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
new file mode 100644
index 000000000..d2acbf422
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.utils;
+
+import java.nio.charset.StandardCharsets;
+
+/** BinaryUtil util to convert the value to binary. */
+public class ConvertBinaryUtil {
+
+ private ConvertBinaryUtil() {}
+
+ public static byte[] paddingTo8Byte(byte[] data) {
+ return paddingToNByte(data, 8);
+ }
+
+ public static byte[] paddingToNByte(byte[] data, int paddingNum) {
+ if (data.length == paddingNum) {
+ return data;
+ }
+ if (data.length > paddingNum) {
+ byte[] result = new byte[paddingNum];
+ System.arraycopy(data, 0, result, 0, paddingNum);
+ return result;
+ }
+ int paddingSize = paddingNum - data.length;
+ byte[] result = new byte[paddingNum];
+ for (int i = 0; i < paddingSize; i++) {
+ result[i] = 0;
+ }
+ System.arraycopy(data, 0, result, paddingSize, data.length);
+
+ return result;
+ }
+
+ public static byte[] utf8To8Byte(String data) {
+ return paddingTo8Byte(data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static Long convertStringToLong(String data) {
+ byte[] bytes = utf8To8Byte(data);
+ return convertBytesToLong(bytes);
+ }
+
+ public static long convertBytesToLong(byte[] bytes) {
+ byte[] paddedBytes = paddingTo8Byte(bytes);
+ long temp = 0L;
+ for (int i = 7; i >= 0; i--) {
+ temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
+ }
+ return temp;
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
new file mode 100644
index 000000000..c97a2fbfb
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertBytesToLong;
+import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertStringToLong;
+
+/** Test for {@link ConvertBinaryUtil}. */
+public class ConvertBinaryUtilTest {
+
+ @Test
+ public void testConvertToLong() {
+ String randomString = generateRandomString();
+ byte[] randomStringBytes = randomString.getBytes(StandardCharsets.UTF_8);
+
+ Long convertStringValue = convertStringToLong(randomString);
+ Long convertBytesValue = convertBytesToLong(randomStringBytes);
+ Assert.assertEquals(convertStringValue, convertBytesValue);
+ }
+
+ public static String generateRandomString() {
+ String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ Random random = new Random();
+ int length = random.nextInt(100) + 1;
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (int i = 0; i < length; i++) {
+ int index = random.nextInt(characters.length());
+ stringBuilder.append(characters.charAt(index));
+ }
+ return stringBuilder.toString();
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
index 788c8dd3e..986dad42c 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -83,7 +83,6 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
"CALL paimon.sys.compact(table => 'T', order_strategy => 'zorder', order_by => 'a,b')"),
Row(true) :: Nil)
- // test order sort
val result2 = new util.ArrayList[Row]()
result2.add(0, Row(0, 0))
result2.add(1, Row(0, 1))
@@ -97,6 +96,26 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
+ // test hilbert sort
+ val result3 = new util.ArrayList[Row]()
+ result3.add(0, Row(0, 0))
+ result3.add(1, Row(0, 1))
+ result3.add(2, Row(1, 1))
+ result3.add(3, Row(1, 0))
+ result3.add(4, Row(2, 0))
+ result3.add(5, Row(2, 1))
+ result3.add(6, Row(2, 2))
+ result3.add(7, Row(1, 2))
+ result3.add(8, Row(0, 2))
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.compact(table => 'T', order_strategy => 'hilbert', order_by => 'a,b')"),
+ Row(true) :: Nil)
+
+ Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
+
+ // test order sort
checkAnswer(
spark.sql(
"CALL paimon.sys.compact(table => 'T', order_strategy => 'order', order_by => 'a,b')"),
@@ -178,7 +197,6 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
"CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')"),
Row(true) :: Nil)
- // test order sort
val result2 = new util.ArrayList[Row]()
result2.add(0, Row(0, 0, 0))
result2.add(1, Row(0, 0, 1))
@@ -193,6 +211,27 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2)
Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+ // test hilbert sort
+ val result3 = new util.ArrayList[Row]()
+ result3.add(0, Row(0, 0, 0))
+ result3.add(1, Row(0, 0, 1))
+ result3.add(2, Row(0, 1, 1))
+ result3.add(3, Row(0, 1, 0))
+ result3.add(4, Row(0, 2, 0))
+ result3.add(5, Row(0, 2, 1))
+ result3.add(6, Row(0, 2, 2))
+ result3.add(7, Row(0, 1, 2))
+ result3.add(8, Row(0, 0, 2))
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'hilbert', order_by => 'a,b')"),
+ Row(true) :: Nil)
+
+ Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3)
+ Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+
+ // test order sort
checkAnswer(
spark.sql(
"CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'order', order_by => 'a,b')"),