You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Marios Trivyzas (Jira)" <ji...@apache.org> on 2022/04/07 12:06:00 UTC

[jira] [Commented] (FLINK-26098) TableAPI does not forward idleness configuration from DataStream

    [ https://issues.apache.org/jira/browse/FLINK-26098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518838#comment-17518838 ] 

Marios Trivyzas commented on FLINK-26098:
-----------------------------------------

Using the following code:

 
{noformat}
Configuration configuration = new Configuration();
configuration.set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofSeconds(100));
EnvironmentSettings settings =
        EnvironmentSettings.newInstance().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);

String srcTableDdl =
        "CREATE TABLE MyTable (\n"
                + " a INT,\n"
                + " b BIGINT,\n"
                + " c VARCHAR,\n"
                + " `rowtime` AS TO_TIMESTAMP(c),\n"
                + " proctime as PROCTIME(),\n"
                + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n"
                + ") WITH (\n"
                + " 'connector' = 'datagen',\n"
                + " 'number-of-rows' = '10'"
                + ")\n";
tEnv.executeSql(srcTableDdl);
String sinkTableDdl =
        "CREATE TABLE MySink (\n"
                + " b BIGINT,\n"
                + " sum_a INT\n"
                + ") WITH (\n"
                + " 'connector' = 'blackhole')\n";
tEnv.executeSql(sinkTableDdl);

{noformat}
and with debugging, we can see that the {{table.exec.source.idle-timeout}} is there (100 seconds) and used correctly in the pipeline:

!Screenshot_20220407_150020.png!

 

[~twalthr] Do you think we can close this issue?

> TableAPI does not forward idleness configuration from DataStream
> ----------------------------------------------------------------
>
>                 Key: FLINK-26098
>                 URL: https://issues.apache.org/jira/browse/FLINK-26098
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Till Rohrmann
>            Assignee: Marios Trivyzas
>            Priority: Major
>         Attachments: Screenshot_20220407_150020.png
>
>
> The TableAPI does not forward the idleness configuration from a DataStream source. That can lead to the halt of processing if all sources are idle because {{WatermarkAssignerOperator}} [1] will never set a channel to active again. The only way to mitigate the problem is to explicitly configure the idleness for table sources via {{table.exec.source.idle-timeout}}. Configuring this value is actually not easy because creating a {{StreamExecutionEnvironment}} via {{create(StreamExecutionEnvironment, TableConfig)}} is deprecated.
> [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java#L103



--
This message was sent by Atlassian Jira
(v8.20.1#820001)