You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yanbiao (Jira)" <ji...@apache.org> on 2022/04/11 06:24:00 UTC
[jira] [Commented] (FLINK-27138) flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE
[ https://issues.apache.org/jira/browse/FLINK-27138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520323#comment-17520323 ]
yanbiao commented on FLINK-27138:
---------------------------------
hi [~martijnvisser] i have translated it ,other developer is also waiting for the issue result ,would you check it and re-open the issue?
> flink1.14.0-sql-job submit failed:Flink doesn't support individual window table-valued function TUMBLE
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27138
> URL: https://issues.apache.org/jira/browse/FLINK-27138
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission, Table SQL / Client
> Affects Versions: 1.14.0
> Environment: CentOS-7
> flink 1.14.0 Release
> Reporter: yanbiao
> Priority: Critical
> Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png
>
>
> I am working with a standalone deployment of flink-1.14.0-bin-scala_2.11
> the package is downloaded from official website: [https://flink.apache.org/downloads.html]
> then i submit the job to the standalone with rest interface of [ /jars/<jarName>/run]
> in most condition,it works well (where clause is simple or without an *or* operator)
> but when i add some conditions with or operator in where clause, the submit go with a exception , and even more surprising is that
> [where A or B] , [where A or B or C] is OK,
> but when or is more than 3 ,like [where A or B or C or D] *is not OK;*
> i have tried more situations,they worked diffrent,
> and it works well to add a true condition before an or condition,for example:
> [where 1=1 and A or B or C]
> *I have asked for advice from offical community DingDing Group and wechat Group,*
> *the admin told me that it should be a BUG and suggested starting a issue*
> the following message is the stacktrace:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]).
> Please use window table-valued function with the following computations:
> 1. aggregate using window_start and window_end as group keys.
> 2. topN using window_start and window_end as partition key.
> 3. join with join condition contains window starts equality of input tables and window ends equality of input tables.
> the sql content is :
> CREATE TABLE source22 (
> `timestamp` VARCHAR,
> `logLevel` VARCHAR,
> `threadName` VARCHAR,
> `componentId` VARCHAR,
> `stackTrace` VARCHAR,
> `logType` VARCHAR,
> `eventType` VARCHAR,
> `subType` VARCHAR,
> `operateType` VARCHAR,
> `operateTag` VARCHAR,
> `weight` INT,
> `operator` VARCHAR,
> `authRoles` VARCHAR,
> `sourceHost` VARCHAR,
> `restUri` VARCHAR,
> `restMethod` VARCHAR,
> `operateObj` VARCHAR,
> `operateResult` VARCHAR,
> `requestParams` VARCHAR,
> `triggerCondition` VARCHAR,
> `authType` VARCHAR,
> `dataSize` INT,
> `exceptionMsg` VARCHAR,
> ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'),
> WATERMARK FOR ts AS ts - INTERVAL '10'second
> ) WITH (
> 'connector' = 'kafka',
> 'format' = 'json',
> 'properties.bootstrap.servers' = '10.192.78.27:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'topic' = 'topic22',
> 'properties.group.id' = 'groupId_22'
> )
> CREATE TABLE sink22 (
> `id` VARCHAR,
> `rule_key` VARCHAR,
> `rule_name` VARCHAR,
> `metric_threshold` INT,
> `audit_status` INT,
> `audit_comment_num` INT,
> `window_start` TIMESTAMP(3),
> `window_end` TIMESTAMP(3),
> `metric_count` BIGINT,
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://10.192.78.27:39200',
> 'index' = 'index22'
> )
> INSERT INTO sink22
> SELECT uuid() as id ,'22' as rule_key ,'4 or condition test' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count
> FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
> WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c' or componentId='d' )
> GROUP BY window_start,window_end
> HAVING count(*) >2
>
> the main class in the jar is:
> public class AuditRuleJob {
> public static void main(String[] args) {
> final ParameterTool params = ParameterTool.fromArgs(args);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(params);
> env.setParallelism(1);
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000));
> env.enableCheckpointing(60000);
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> if (!params.has("source") || !params.has("sink") || !params.has("transform")){
> throw new RuntimeException("source or sink or transform sql parameter missing");
> }
> String sourceBase64 = params.get("source");
> String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
> String sinkBase64 = params.get("sink");
> String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
> String transformBase64 = params.get("transform");
> String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
> tableEnv.executeSql(source);
> tableEnv.executeSql(sink);
> tableEnv.executeSql(transform);
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.1#820001)