You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Weijie Guo (Jira)" <ji...@apache.org> on 2023/04/04 10:00:01 UTC
[jira] [Comment Edited] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
[ https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708341#comment-17708341 ]
Weijie Guo edited comment on FLINK-30989 at 4/4/23 9:59 AM:
------------------------------------------------------------
[~lsy] Can you confirm whether the changes related to table part need backport to release-1.16 branch.
was (Author: weijie guo):
[~lsy] Can you confirm whether the changes in the Table part need backport to release-1.16 branch.
> Configuration table.exec.spill-compression.block-size not take effect in batch job
> ----------------------------------------------------------------------------------
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Configuration, Table SQL / Runtime
> Affects Versions: 1.16.1
> Reporter: shen
> Assignee: dalongliu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
> public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect
> final DataStream<Order> orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
> }
> }
> // ---------------------------------------------------------------
> // Order.java
> package test.flink403;
> public class Order {
> public Long user;
> public String product;
> public int amount;
> // for POJO detection in DataStream API
> public Order() {}
> // for structured type detection in Table API
> public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
> }
> @Override
> public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
> }
> }{code}
>
> I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph.
> Following are the Classes use the same method to get conf from JobConfiguration:
> * BinaryExternalSorter
> ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
> ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
> ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
> ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
> * BinaryHashTable,BaseHybridHashTable
> ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
> ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
> * SortDataInput
> ** AlgorithmOptions.SORT_SPILLING_THRESHOLD
> ** AlgorithmOptions.SPILLING_MAX_FAN
> ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER
--
This message was sent by Atlassian Jira
(v8.20.10#820010)