You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Weijie Guo (Jira)" <ji...@apache.org> on 2023/04/04 10:00:00 UTC

[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

    [ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708341#comment-17708341 ] 

Weijie Guo commented on FLINK-30989:
------------------------------------

[~lsy] Can you confirm whether the changes in the Table part need backport to release-1.16 branch.

> Configuration table.exec.spill-compression.block-size not take effect in batch job
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-30989
>                 URL: https://issues.apache.org/jira/browse/FLINK-30989
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration, Table SQL / Runtime
>    Affects Versions: 1.16.1
>            Reporter: shen
>            Assignee: dalongliu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0
>
>         Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I  attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
>     Configuration config = new Configuration();
>     config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
>     config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true);
>     config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
>     config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect
>     config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
>     final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>     tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect
>     final DataStream<Order> orderA =
>         env.fromCollection(
>             Arrays.asList(
>                 new Order(1L, "beer", 3),
>                 new Order(1L, "diaper", 4),
>                 new Order(3L, "rubber", 2)));
>     final Table tableA = tableEnv.fromDataStream(orderA);
>     final Table result =
>         tableEnv.sqlQuery(
>             "SELECT * FROM "
>                 + tableA
>                 + " "
>                 + " order by user");
>     tableEnv.toDataStream(result, Order.class).print();
>     env.execute();
>   }
> }
> // ---------------------------------------------------------------
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
>     this.user = user;
>     this.product = product;
>     this.amount = amount;
>   }
>   @Override
>   public String toString() {
>     return "Order{"
>         + "user="
>         + user
>         + ", product='"
>         + product
>         + '\''
>         + ", amount="
>         + amount
>         + '}';
>   }
> }{code}
>  
> I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * BinaryHashTable,BaseHybridHashTable
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * SortDataInput
>  ** AlgorithmOptions.SORT_SPILLING_THRESHOLD
>  ** AlgorithmOptions.SPILLING_MAX_FAN
>  ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER



--
This message was sent by Atlassian Jira
(v8.20.10#820010)