You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ju...@apache.org on 2024/04/02 02:29:00 UTC

(paimon) branch master updated: [flink] Add local sample magnification option for sort compaction to prevent sampling long time. (#3081)

This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6921d41c9 [flink] Add local sample magnification option for sort compaction to prevent sampling long time. (#3081)
6921d41c9 is described below

commit 6921d41c9e6ada9c133d42f5662389ac5526402a
Author: wgcn <10...@qq.com>
AuthorDate: Tue Apr 2 10:28:55 2024 +0800

    [flink] Add local sample magnification option for sort compaction to prevent sampling long time. (#3081)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 ++++
 .../apache/paimon/flink/shuffle/RangeShuffle.java  |  7 ++-
 .../org/apache/paimon/flink/sorter/SortUtils.java  | 15 ++++-
 .../SortCompactActionForUnawareBucketITCase.java   | 65 +++++++++++++++++-----
 5 files changed, 82 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 7d6db0ce6..5959b6b4f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -569,6 +569,12 @@ This config option does not affect the default filesystem metastore.</td>
             <td>Duration</td>
             <td>In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.</td>
         </tr>
+        <tr>
+            <td><h5>sort-compaction.local-sample.magnification</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.</td>
+        </tr>
         <tr>
             <td><h5>sort-compaction.range-strategy</h5></td>
             <td style="word-wrap: break-word;">QUANTITY</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c6fde0f3e..cbe5f45c5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1076,6 +1076,12 @@ public class CoreOptions implements Serializable {
                                     + "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
                                     + "the config can be set to size.");
 
+    public static final ConfigOption<Integer> SORT_COMPACTION_SAMPLE_MAGNIFICATION =
+            key("sort-compaction.local-sample.magnification")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.");
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1146,6 +1152,10 @@ public class CoreOptions implements Serializable {
         return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
     }
 
+    public Integer getLocalSampleMagnification() {
+        return options.get(SORT_COMPACTION_SAMPLE_MAGNIFICATION);
+    }
+
     public static FileFormat createFileFormat(
             Options options, ConfigOption<FileFormatType> formatOption) {
         String formatIdentifier = options.get(formatOption).toString();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 9c67e8855..1883890ea 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -96,7 +96,8 @@ public class RangeShuffle {
             DataStream<Tuple2<T, RowData>> inputDataStream,
             SerializableSupplier<Comparator<T>> keyComparator,
             TypeInformation<T> keyTypeInformation,
-            int sampleSize,
+            int localSampleSize,
+            int globalSampleSize,
             int rangeNum,
             int outParallelism,
             RowType valueRowType,
@@ -116,7 +117,7 @@ public class RangeShuffle {
                 new OneInputTransformation<>(
                         keyInput,
                         "LOCAL SAMPLE",
-                        new LocalSampleOperator<>(sampleSize),
+                        new LocalSampleOperator<>(localSampleSize),
                         new TupleTypeInfo<>(
                                 BasicTypeInfo.DOUBLE_TYPE_INFO,
                                 keyTypeInformation,
@@ -128,7 +129,7 @@ public class RangeShuffle {
                 new OneInputTransformation<>(
                         localSample,
                         "GLOBAL SAMPLE",
-                        new GlobalSampleOperator<>(sampleSize, keyComparator, rangeNum),
+                        new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum),
                         new ListTypeInfo<>(keyTypeInformation),
                         1);
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 00a1475af..9a1dbb729 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -99,9 +99,17 @@ public class SortUtils {
                     "The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
                             + FlinkConnectorOptions.SINK_PARALLELISM.key());
         }
-        final int sampleSize = sinkParallelism * 1000;
+        int localSampleMagnification = options.getLocalSampleMagnification();
+        if (localSampleMagnification < 20) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
+                            CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+                            localSampleMagnification));
+        }
+        final int localSampleSize = sinkParallelism * localSampleMagnification;
+        final int globalSampleSize = sinkParallelism * 1000;
         final int rangeNum = sinkParallelism * 10;
-
         int keyFieldCount = sortKeyType.getFieldCount();
         int valueFieldCount = valueRowType.getFieldCount();
         final int[] valueProjectionMap = new int[valueFieldCount];
@@ -144,7 +152,8 @@ public class SortUtils {
                         inputWithKey,
                         shuffleKeyComparator,
                         keyTypeInformation,
-                        sampleSize,
+                        localSampleSize,
+                        globalSampleSize,
                         rangeNum,
                         sinkParallelism,
                         valueRowType,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 317661feb..b41da234e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -39,6 +39,8 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -347,22 +349,55 @@ public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
 
     private SortCompactAction createAction(
             String orderStrategy, String rangeStrategy, List<String> columns) {
+        return createAction(orderStrategy, rangeStrategy, columns, Lists.newArrayList());
+    }
+
+    private SortCompactAction createAction(
+            String orderStrategy,
+            String rangeStrategy,
+            List<String> columns,
+            List<String> extraConfigs) {
+        ArrayList<String> args =
+                Lists.newArrayList(
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--order_strategy",
+                        orderStrategy,
+                        "--order_by",
+                        String.join(",", columns),
+                        "--table_conf",
+                        "sort-compaction.range-strategy=" + rangeStrategy);
+        args.addAll(extraConfigs);
+        return createAction(SortCompactAction.class, args.toArray(new String[0]));
+    }
 
-        return createAction(
-                SortCompactAction.class,
-                "compact",
-                "--warehouse",
-                warehouse,
-                "--database",
-                database,
-                "--table",
-                tableName,
-                "--order_strategy",
-                orderStrategy,
-                "--order_by",
-                String.join(",", columns),
-                "--table_conf sort-compaction.range-strategy=" + rangeStrategy,
-                rangeStrategy);
+    @Test
+    public void testvalidSampleConfig() throws Exception {
+        prepareData(300, 1);
+        {
+            ArrayList<String> extraCompactionConfig =
+                    Lists.newArrayList(
+                            "--table_conf", "sort-compaction.local-sample.magnification=1");
+            Assertions.assertThatCode(
+                            () -> {
+                                createAction(
+                                                "order",
+                                                "size",
+                                                Arrays.asList(
+                                                        "f0", "f1", "f2", "f3", "f4", "f5", "f6",
+                                                        "f7", "f8", "f9", "f10", "f11", "f12",
+                                                        "f13", "f14", "f15"),
+                                                extraCompactionConfig)
+                                        .run();
+                            })
+                    .hasMessage(
+                            "the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
+        }
     }
 
     private void createTable() throws Exception {