You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "JingsongLi (via GitHub)" <gi...@apache.org> on 2023/08/21 03:46:44 UTC

[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #1846: [improve] support z-order range sort action

JingsongLi commented on code in PR #1846:
URL: https://github.com/apache/incubator-paimon/pull/1846#discussion_r1299551688


##########
paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java:
##########
@@ -159,8 +159,11 @@ private MutableObjectIterator<BinaryRow> spilledIterator() throws IOException {
         return new MutableObjectIterator<BinaryRow>() {
             @Override
             public BinaryRow next(BinaryRow reuse) throws IOException {
-                // BinaryMergeIterator ignore reuse object argument, use its own reusing object
-                return next();
+                BinaryRow row = iterator.next();

Review Comment:
   revert this.



##########
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.utils;

Review Comment:
   Add copy header.



##########
docs/content/concepts/append-only-table.md:
##########
@@ -227,6 +227,26 @@ behavior is exactly the same as [Append For Qeueue]({{< ref "#compaction" >}}).
 The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by flink action in paimon
 and disable all the other compaction by set `write-only`.
 
+### Order Rewrite
+
+The data in a per-partition out of order will lead a slow select, compaction may slow down the inserting. It is a good choice for you to set 
+write-only for inserting job, and after per-partition data done, trigger a partition `Order Rewrite` action. `Order Rewrite` action self could do two things:
+* Order all the data specified by select clause.
+* Rewrite the ordered data in a compaction way.
+
+You can trigger action by shell script:
+```shell
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
+    order-rewrite \

Review Comment:
   use `compact`?



##########
paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ByteBuffers;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+
+/** Z-indexer for responsibility to generate z-index. */
+public class ZIndexer implements Serializable {
+
+    private final Set<ZorderBaseFunction> functionSet;
+    private final int[] fieldsIndex;
+    private final int totalBytes;
+    private transient ByteBuffer reuse;
+
+    public ZIndexer(RowType rowType, List<String> orderColumns) {
+        List<String> fields = rowType.getFieldNames();
+        fieldsIndex = new int[orderColumns.size()];
+        for (int i = 0; i < fieldsIndex.length; i++) {
+            int index = fields.indexOf(orderColumns.get(i));
+            if (index == -1) {
+                throw new IllegalArgumentException(
+                        "Can't find column: "
+                                + orderColumns.get(i)
+                                + " in row type fields: "
+                                + fields);
+            }
+            fieldsIndex[i] = index;
+        }
+        this.functionSet = constructFunctionMap(rowType.getFields());
+        this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length;
+    }
+
+    public void open() {
+        this.reuse = ByteBuffer.allocate(totalBytes);
+        functionSet.forEach(ZorderBaseFunction::open);
+    }
+
+    public int size() {
+        return totalBytes;
+    }
+
+    public byte[] index(InternalRow row) {
+        byte[][] columnBytes = new byte[fieldsIndex.length][];
+
+        int index = 0;
+        for (ZorderBaseFunction f : functionSet) {
+            columnBytes[index++] = f.apply(row);
+        }
+
+        return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse);
+    }
+
+    public Set<ZorderBaseFunction> constructFunctionMap(List<DataField> fields) {
+        Set<ZorderBaseFunction> zorderFunctionSet = new LinkedHashSet<>();
+        // Construct zorderFunctionSet and fill dataTypes, rowFields
+        for (int fieldIndex = 0; fieldIndex < fieldsIndex.length; fieldIndex++) {
+            int index = fieldsIndex[fieldIndex];
+            DataField field = fields.get(index);
+            zorderFunctionSet.add(zmapColumnToCalculator(field).setPosition(index));

Review Comment:
   remove `setPosition`.
   You should pass index from constructor.



##########
docs/layouts/shortcodes/generated/order-rewrite.html:
##########
@@ -0,0 +1,67 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--table</h5></td>
+        <td>The Paimon table name.</td>
+    </tr>
+    <tr>
+        <td><h5>--select</h5></td>
+        <td>The select sql that can be compiled by flink. We order all the data found by the select-clause. For example: --select "select * from source where id > 10"</td>
+    </tr>
+    <tr>
+        <td><h5>--order-by</h5></td>
+        <td>Specify the order. Now we have zorder and order. "zorder" will sort the data by z-order. For example: --order-by zorder(col0, col1)  --order-by order(a,b,c)</td>

Review Comment:
   Add a new option, `order-strategy`?



##########
paimon-core/src/main/java/org/apache/paimon/utils/ByteBuffers.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Most of the content of this class is referenced from Iceberg's ByteBuffers.
+ *
+ * <p>This class is a util for processing {@link ByteBuffer}.
+ */
+public class ByteBuffers {

Review Comment:
   Remove this class, and create `Bytes` class.



##########
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Within Z-Ordering the byte representations of objects being compared must be ordered, this
+ * requires several types to be transformed when converted to bytes. The goal is to map object's
+ * whose byte representation are not lexicographically ordered into representations that are
+ * lexicographically ordered. Bytes produced should be compared lexicographically as unsigned bytes,
+ * big-endian.
+ *
+ * <p>All types except for String are stored within an 8 Byte Buffer
+ *
+ * <p>Most of these techniques are derived from
+ * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/
+ *
+ * <p>Most of the content of this class is referenced from Iceberg's ZOrderByteUtils.

Review Comment:
   remove this line



##########
paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ByteBuffers;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+
+/** Z-indexer for responsibility to generate z-index. */
+public class ZIndexer implements Serializable {
+
+    private final Set<ZorderBaseFunction> functionSet;
+    private final int[] fieldsIndex;
+    private final int totalBytes;
+    private transient ByteBuffer reuse;
+
+    public ZIndexer(RowType rowType, List<String> orderColumns) {
+        List<String> fields = rowType.getFieldNames();
+        fieldsIndex = new int[orderColumns.size()];
+        for (int i = 0; i < fieldsIndex.length; i++) {
+            int index = fields.indexOf(orderColumns.get(i));
+            if (index == -1) {
+                throw new IllegalArgumentException(
+                        "Can't find column: "
+                                + orderColumns.get(i)
+                                + " in row type fields: "
+                                + fields);
+            }
+            fieldsIndex[i] = index;
+        }
+        this.functionSet = constructFunctionMap(rowType.getFields());
+        this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length;
+    }
+
+    public void open() {
+        this.reuse = ByteBuffer.allocate(totalBytes);
+        functionSet.forEach(ZorderBaseFunction::open);
+    }
+
+    public int size() {
+        return totalBytes;
+    }
+
+    public byte[] index(InternalRow row) {
+        byte[][] columnBytes = new byte[fieldsIndex.length][];
+
+        int index = 0;
+        for (ZorderBaseFunction f : functionSet) {
+            columnBytes[index++] = f.apply(row);
+        }
+
+        return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse);
+    }
+
+    public Set<ZorderBaseFunction> constructFunctionMap(List<DataField> fields) {
+        Set<ZorderBaseFunction> zorderFunctionSet = new LinkedHashSet<>();
+        // Construct zorderFunctionSet and fill dataTypes, rowFields
+        for (int fieldIndex = 0; fieldIndex < fieldsIndex.length; fieldIndex++) {
+            int index = fieldsIndex[fieldIndex];
+            DataField field = fields.get(index);
+            zorderFunctionSet.add(zmapColumnToCalculator(field).setPosition(index));
+        }
+        return zorderFunctionSet;
+    }
+
+    public static ZorderBaseFunction zmapColumnToCalculator(DataField field) {
+        DataType type = field.type();
+        return type.accept(new TypeVisitor());
+    }
+
+    /** Type Visitor to generate function map from row column to z-index. */
+    public static class TypeVisitor implements DataTypeVisitor<ZorderBaseFunction> {
+
+        @Override
+        public ZorderBaseFunction visit(CharType charType) {
+            return new ZorderStringFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(VarCharType varCharType) {
+            return new ZorderStringFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(BooleanType booleanType) {
+            return new ZorderBooleanFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(BinaryType binaryType) {
+            return new ZorderBytesFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(VarBinaryType varBinaryType) {
+            return new ZorderBytesFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(DecimalType decimalType) {
+            return new ZorderDecimalFunction(decimalType.getPrecision(), decimalType.getScale());
+        }
+
+        @Override
+        public ZorderBaseFunction visit(TinyIntType tinyIntType) {
+            return new ZorderTinyIntFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(SmallIntType smallIntType) {
+            return new ZorderShortFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(IntType intType) {
+            return new ZorderIntFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(BigIntType bigIntType) {
+            return new ZorderLongFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(FloatType floatType) {
+            return new ZorderFloatFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(DoubleType doubleType) {
+            return new ZorderDoubleFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(DateType dateType) {
+            return new ZorderDateFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(TimeType timeType) {
+            return new ZorderTimeFunction();
+        }
+
+        @Override
+        public ZorderBaseFunction visit(TimestampType timestampType) {
+            return new ZorderTimestampFunction(timestampType.getPrecision());
+        }
+
+        @Override
+        public ZorderBaseFunction visit(LocalZonedTimestampType localZonedTimestampType) {
+            return new ZorderTimestampFunction(localZonedTimestampType.getPrecision());
+        }
+
+        @Override
+        public ZorderBaseFunction visit(ArrayType arrayType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public ZorderBaseFunction visit(MultisetType multisetType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public ZorderBaseFunction visit(MapType mapType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public ZorderBaseFunction visit(RowType rowType) {
+            throw new RuntimeException("Unsupported type");
+        }
+    }
+
+    /** BaseFunction to convert row field record to devoted bytes. */
+    public abstract static class ZorderBaseFunction

Review Comment:
   You can refer to `InternalRow.createFieldGetter`



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffleUtil.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.shuffle;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Comparator;
+import java.util.List;
+
+/** Topo. */
+public class RangeShuffleUtil {

Review Comment:
   rename this to `RangeShuffle`, put all classes into this class.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffleUtil.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.shuffle;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Comparator;
+import java.util.List;
+
+/** Topo. */

Review Comment:
   ?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SorterFactory.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** Sorter factory for zorder and order by and so on... */
+public class SorterFactory {

Review Comment:
   Put this into `TableSorter`.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
+/** SortOperator to sort the `InternalRow`s by the `KeyType`. */
+public class SortOperator extends TableStreamOperator<InternalRow>
+        implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {
+
+    private final RowType keyRowType;
+    private final RowType valueRowType;
+    private transient BinaryExternalSortBuffer buffer;
+    private transient BinaryRow reuseBinaryRow;
+
+    public SortOperator(RowType keyType, RowType valueRowType) {
+        this.keyRowType = keyType;
+        this.valueRowType = valueRowType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        List<DataField> keyFields = keyRowType.getFields();
+        List<DataField> dataFields = valueRowType.getFields();
+
+        List<DataField> fields = new ArrayList<>();
+        fields.addAll(keyFields);
+        fields.addAll(dataFields);
+
+        RowType rowType = new RowType(fields);
+        reuseBinaryRow = new BinaryRow(rowType.getFieldCount());
+
+        InternalRowSerializer serializer = InternalSerializers.create(rowType);
+        NormalizedKeyComputer normalizedKeyComputer =
+                CodeGenUtils.newNormalizedKeyComputer(
+                        rowType.getFieldTypes(), "MemTableKeyComputer");
+        RecordComparator keyComparator =
+                CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), "MemTableComparator");
+
+        MemorySegmentPool memoryPool =
+                new HeapMemorySegmentPool(
+                        MemorySize.parse("512 mb").getBytes(),

Review Comment:
   Configure by table options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org