You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shen (Jira)" <ji...@apache.org> on 2023/02/09 12:00:00 UTC
[jira] [Created] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
shen created FLINK-30989:
----------------------------
Summary: Configuration table.exec.spill-compression.block-size not take effect in batch job
Key: FLINK-30989
URL: https://issues.apache.org/jira/browse/FLINK-30989
Project: Flink
Issue Type: Bug
Components: API / DataStream, Runtime / Configuration
Affects Versions: 1.16.1
Reporter: shen
Attachments: image-2023-02-09-19-37-44-927.png
h1. Description
I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty:
!image-2023-02-09-19-37-44-927.png|width=306,height=185!
h1. How to reproduce
A simple code to reproduce this problem:
{code:java}
// App.java
package test.flink403;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import java.util.Arrays; public class App {
public static void main(String argc[]) throws Exception {
Configuration config = new Configuration();
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true);
config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect
config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect
final DataStream<Order> orderA =
env.fromCollection(
Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
final Table tableA = tableEnv.fromDataStream(orderA);
final Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ tableA
+ " "
+ " order by user");
tableEnv.toDataStream(result, Order.class).print();
env.execute();
}
}
// ---------------------------------------------------------------
// Order.java
package test.flink403;
public class Order {
public Long user;
public String product;
public int amount;
// for POJO detection in DataStream API
public Order() {}
// for structured type detection in Table API
public Order(Long user, String product, int amount) {
this.user = user;
this.product = product;
this.amount = amount;
}
@Override
public String toString() {
return "Order{"
+ "user="
+ user
+ ", product='"
+ product
+ '\''
+ ", amount="
+ amount
+ '}';
}
}{code}
I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph.
Following are the Classes use the same method to get conf from JobConfiguration:
* BinaryExternalSorter
** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
* BinaryHashTable,BaseHybridHashTable
** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
* SortDataInput
** AlgorithmOptions.SORT_SPILLING_THRESHOLD
** AlgorithmOptions.SPILLING_MAX_FAN
** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER
--
This message was sent by Atlassian Jira
(v8.20.10#820010)