You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yanbiao (Jira)" <ji...@apache.org> on 2022/04/11 06:19:00 UTC

[jira] [Updated] (FLINK-27138) flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE

     [ https://issues.apache.org/jira/browse/FLINK-27138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

yanbiao updated FLINK-27138:
----------------------------
    Description: 
I am working with a standalone deployment of flink-1.14.0-bin-scala_2.11

the package is downloaded  from official website:    [https://flink.apache.org/downloads.html]

then i submit the job to the standalone with rest interface of  [ /jars/<jarName>/run]

in most condition,it works well (where clause is simple or without an *or* operator)

but when i add some conditions with or operator in where clause, the submit go with a exception , and even more surprising  is that [where A or B] , [where  A or B or C] is OK, 

but when or is more than 3 ,like [{*}where A or B or C or D{*}] *is not OK;*

i have tried more situations,they worked diffrent,

and it works well to add a true condition before an or condition,for example:

[where 1=1 and A or B or C]

*I have asked for advice from offical community DingDing Group and wechat Group,the admin told me that it should be a BUG and suggested starting a issue*

the following message is the stacktrace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]).
 Please use window table-valued function with the following computations:
1. aggregate using window_start and window_end as group keys.
2. topN using window_start and window_end as partition key.
3. join with join condition contains window starts equality of input tables and window ends equality of input tables.

the sql content is : 
CREATE TABLE source22 (
    `timestamp` VARCHAR,
    `logLevel` VARCHAR,
    `threadName` VARCHAR,
    `componentId` VARCHAR,
    `stackTrace` VARCHAR,
    `logType` VARCHAR,
    `eventType` VARCHAR,
    `subType` VARCHAR,
    `operateType` VARCHAR,
    `operateTag` VARCHAR,
    `weight` INT,
    `operator` VARCHAR,
    `authRoles` VARCHAR,
    `sourceHost` VARCHAR,
    `restUri` VARCHAR,
    `restMethod` VARCHAR,
    `operateObj` VARCHAR,
    `operateResult` VARCHAR,
    `requestParams` VARCHAR,
    `triggerCondition` VARCHAR,
    `authType` VARCHAR,
    `dataSize` INT,
    `exceptionMsg` VARCHAR,
    ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'),
    WATERMARK FOR  ts AS  ts - INTERVAL '10'second
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'properties.bootstrap.servers' = '10.192.78.27:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'topic22',
    'properties.group.id' = 'groupId_22'
)
CREATE TABLE sink22 (
    `id` VARCHAR,
    `rule_key` VARCHAR,
    `rule_name` VARCHAR,
    `metric_threshold` INT,
    `audit_status` INT,
    `audit_comment_num` INT,
    `window_start` TIMESTAMP(3),
    `window_end` TIMESTAMP(3),
    `metric_count` BIGINT,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://10.192.78.27:39200',
    'index' = 'index22'
)
INSERT INTO sink22
SELECT uuid() as id ,'22' as rule_key ,'4 or condition test' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count
FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c'  or componentId='d' )
GROUP BY window_start,window_end
HAVING count(*) >2
 
the main class in the jar is:
public class AuditRuleJob {

    public static void main(String[] args) {
        final ParameterTool params = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000));
        env.enableCheckpointing(60000);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);                env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        if (!params.has("source") || !params.has("sink") || !params.has("transform"))

{             throw new RuntimeException("source or sink or transform sql parameter missing");         }

        String sourceBase64 = params.get("source");
        String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        String sinkBase64 = params.get("sink");
        String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        String transformBase64 = params.get("transform");
        String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        tableEnv.executeSql(source);
        tableEnv.executeSql(sink);
        tableEnv.executeSql(transform);
    }

}

  was:
官网下载的1.14.0版本,standalone单机部署,采用rest接口提交到webUI

rest接口:

/jars/<jarName>/run

问题:

当process的sql中where后面的连续的or条件多于3个(3个是可以的)时,提交报错

报错信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]).
 Please use window table-valued function with the following computations:
1. aggregate using window_start and window_end as group keys.
2. topN using window_start and window_end as partition key.
3. join with join condition contains window starts equality of input tables and window ends equality of input tables.
提交的sql如下:
CREATE TABLE source22 (
    `timestamp` VARCHAR,
    `logLevel` VARCHAR,
    `threadName` VARCHAR,
    `componentId` VARCHAR,
    `stackTrace` VARCHAR,
    `logType` VARCHAR,
    `eventType` VARCHAR,
    `subType` VARCHAR,
    `operateType` VARCHAR,
    `operateTag` VARCHAR,
    `weight` INT,
    `operator` VARCHAR,
    `authRoles` VARCHAR,
    `sourceHost` VARCHAR,
    `restUri` VARCHAR,
    `restMethod` VARCHAR,
    `operateObj` VARCHAR,
    `operateResult` VARCHAR,
    `requestParams` VARCHAR,
    `triggerCondition` VARCHAR,
    `authType` VARCHAR,
    `dataSize` INT,
    `exceptionMsg` VARCHAR,
    ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'),
    WATERMARK FOR  ts AS  ts - INTERVAL '10'second
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'properties.bootstrap.servers' = '10.192.78.27:9092',
    'scan.startup.mode' = 'latest-offset',
    'topic' = 'logaudit_yf20220304',
    'properties.group.id' = 'groupId_22'
)
CREATE TABLE sink22 (
    `id` VARCHAR,
    `rule_key` VARCHAR,
    `rule_name` VARCHAR,
    `metric_threshold` INT,
    `audit_status` INT,
    `audit_comment_num` INT,
    `window_start` TIMESTAMP(3),
    `window_end` TIMESTAMP(3),
    `metric_count` BIGINT,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://10.192.78.27:39200',
    'index' = 'logaudit_rule_22'
)
INSERT INTO sink22
SELECT uuid() as id ,'22' as rule_key ,'4个or测试' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count
FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c'  or componentId='d' )
GROUP BY window_start,window_end
HAVING count(*) >2
 
实际的jar包核心代码如下:
public class AuditRuleJob {

    public static void main(String[] args) {
        final ParameterTool params = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000));
        env.enableCheckpointing(60000);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:cancel后会保留checkpoint数据,恢复时可以恢复到指定的ck
        //CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:cancel后会删除checkpoint数据,只有执行失败的时候才会保存ck
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        if (!params.has("source") || !params.has("sink") || !params.has("transform")) {
            throw new RuntimeException("source or sink or transform sql parameter missing");
        }
        String sourceBase64 = params.get("source");
        String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        String sinkBase64 = params.get("sink");
        String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        String transformBase64 = params.get("transform");
        String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
        tableEnv.executeSql(source);
        tableEnv.executeSql(sink);
        tableEnv.executeSql(transform);
    }

}

       Priority: Critical  (was: Major)
        Summary: flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE  (was: flink1.14.0-standalone部署-sql方式提交job-失败报错-提示不支持)

> flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27138
>                 URL: https://issues.apache.org/jira/browse/FLINK-27138
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Table SQL / Client
>    Affects Versions: 1.14.0
>         Environment: CentOS-7
> flink 1.14.0 Release
>            Reporter: yanbiao
>            Priority: Critical
>         Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png
>
>
> I am working with a standalone deployment of flink-1.14.0-bin-scala_2.11
> the package is downloaded  from official website:    [https://flink.apache.org/downloads.html]
> then i submit the job to the standalone with rest interface of  [ /jars/<jarName>/run]
> in most condition,it works well (where clause is simple or without an *or* operator)
> but when i add some conditions with or operator in where clause, the submit go with a exception , and even more surprising  is that [where A or B] , [where  A or B or C] is OK, 
> but when or is more than 3 ,like [{*}where A or B or C or D{*}] *is not OK;*
> i have tried more situations,they worked diffrent,
> and it works well to add a true condition before an or condition,for example:
> [where 1=1 and A or B or C]
> *I have asked for advice from offical community DingDing Group and wechat Group,the admin told me that it should be a BUG and suggested starting a issue*
> the following message is the stacktrace:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]).
>  Please use window table-valued function with the following computations:
> 1. aggregate using window_start and window_end as group keys.
> 2. topN using window_start and window_end as partition key.
> 3. join with join condition contains window starts equality of input tables and window ends equality of input tables.
> the sql content is : 
> CREATE TABLE source22 (
>     `timestamp` VARCHAR,
>     `logLevel` VARCHAR,
>     `threadName` VARCHAR,
>     `componentId` VARCHAR,
>     `stackTrace` VARCHAR,
>     `logType` VARCHAR,
>     `eventType` VARCHAR,
>     `subType` VARCHAR,
>     `operateType` VARCHAR,
>     `operateTag` VARCHAR,
>     `weight` INT,
>     `operator` VARCHAR,
>     `authRoles` VARCHAR,
>     `sourceHost` VARCHAR,
>     `restUri` VARCHAR,
>     `restMethod` VARCHAR,
>     `operateObj` VARCHAR,
>     `operateResult` VARCHAR,
>     `requestParams` VARCHAR,
>     `triggerCondition` VARCHAR,
>     `authType` VARCHAR,
>     `dataSize` INT,
>     `exceptionMsg` VARCHAR,
>     ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'),
>     WATERMARK FOR  ts AS  ts - INTERVAL '10'second
> ) WITH (
>     'connector' = 'kafka',
>     'format' = 'json',
>     'properties.bootstrap.servers' = '10.192.78.27:9092',
>     'scan.startup.mode' = 'latest-offset',
>     'topic' = 'topic22',
>     'properties.group.id' = 'groupId_22'
> )
> CREATE TABLE sink22 (
>     `id` VARCHAR,
>     `rule_key` VARCHAR,
>     `rule_name` VARCHAR,
>     `metric_threshold` INT,
>     `audit_status` INT,
>     `audit_comment_num` INT,
>     `window_start` TIMESTAMP(3),
>     `window_end` TIMESTAMP(3),
>     `metric_count` BIGINT,
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>     'connector' = 'elasticsearch-7',
>     'hosts' = 'http://10.192.78.27:39200',
>     'index' = 'index22'
> )
> INSERT INTO sink22
> SELECT uuid() as id ,'22' as rule_key ,'4 or condition test' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count
> FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
> WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c'  or componentId='d' )
> GROUP BY window_start,window_end
> HAVING count(*) >2
>  
> the main class in the jar is:
> public class AuditRuleJob {
>     public static void main(String[] args) {
>         final ParameterTool params = ParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().setGlobalJobParameters(params);
>         env.setParallelism(1);
>         env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000));
>         env.enableCheckpointing(60000);
>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);                env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         if (!params.has("source") || !params.has("sink") || !params.has("transform"))
> {             throw new RuntimeException("source or sink or transform sql parameter missing");         }
>         String sourceBase64 = params.get("source");
>         String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
>         String sinkBase64 = params.get("sink");
>         String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
>         String transformBase64 = params.get("transform");
>         String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
>         tableEnv.executeSql(source);
>         tableEnv.executeSql(sink);
>         tableEnv.executeSql(transform);
>     }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)