You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shen (Jira)" <ji...@apache.org> on 2023/02/09 12:00:00 UTC

[jira] [Created] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

shen created FLINK-30989:
----------------------------

             Summary: Configuration table.exec.spill-compression.block-size not take effect in batch job
                 Key: FLINK-30989
                 URL: https://issues.apache.org/jira/browse/FLINK-30989
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Runtime / Configuration
    Affects Versions: 1.16.1
            Reporter: shen
         Attachments: image-2023-02-09-19-37-44-927.png

h1. Description

I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I  attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty:

!image-2023-02-09-19-37-44-927.png|width=306,height=185!
h1. How to reproduce

A simple code to reproduce this problem:
{code:java}
// App.java

package test.flink403;

import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;

import java.util.Arrays; public class App {

  public static void main(String argc[]) throws Exception {

    Configuration config = new Configuration();
    config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
    config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true);
    config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
    config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect
    config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect
    final DataStream<Order> orderA =
        env.fromCollection(
            Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));

    final Table tableA = tableEnv.fromDataStream(orderA);

    final Table result =
        tableEnv.sqlQuery(
            "SELECT * FROM "
                + tableA
                + " "
                + " order by user");

    tableEnv.toDataStream(result, Order.class).print();
    env.execute();
  }
}

// ---------------------------------------------------------------
// Order.java
package test.flink403;

public class Order {
  public Long user;
  public String product;
  public int amount;

  // for POJO detection in DataStream API
  public Order() {}

  // for structured type detection in Table API
  public Order(Long user, String product, int amount) {
    this.user = user;
    this.product = product;
    this.amount = amount;
  }

  @Override
  public String toString() {
    return "Order{"
        + "user="
        + user
        + ", product='"
        + product
        + '\''
        + ", amount="
        + amount
        + '}';
  }
}{code}
 

I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph. 
Following are the Classes use the same method to get conf from JobConfiguration:
 * BinaryExternalSorter
 ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
 * BinaryHashTable,BaseHybridHashTable
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
 * SortDataInput
 ** AlgorithmOptions.SORT_SPILLING_THRESHOLD
 ** AlgorithmOptions.SPILLING_MAX_FAN
 ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER



--
This message was sent by Atlassian Jira
(v8.20.10#820010)