You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yanbiao (Jira)" <ji...@apache.org> on 2022/04/08 10:06:00 UTC
[jira] [Created] (FLINK-27138) flink1.14.0-standalone部署-sql方式提交job-失败报错-提示不支持
yanbiao created FLINK-27138:
-------------------------------
Summary: flink1.14.0-standalone部署-sql方式提交job-失败报错-提示不支持
Key: FLINK-27138
URL: https://issues.apache.org/jira/browse/FLINK-27138
Project: Flink
Issue Type: Bug
Components: Client / Job Submission, Table SQL / Client
Affects Versions: 1.14.0
Environment: CentOS-7
flink 1.14.0 Release
Reporter: yanbiao
Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png
官网下载的1.14.0版本,standalone单机部署,采用rest接口提交到webUI
rest接口:
/jars/<jarName>/run
问题:
当process的sql中where后面的连续的or条件多于3个(3个是可以的)时,提交报错
报错信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]).
Please use window table-valued function with the following computations:
1. aggregate using window_start and window_end as group keys.
2. topN using window_start and window_end as partition key.
3. join with join condition contains window starts equality of input tables and window ends equality of input tables.
提交的sql如下:
CREATE TABLE source22 (
`timestamp` VARCHAR,
`logLevel` VARCHAR,
`threadName` VARCHAR,
`componentId` VARCHAR,
`stackTrace` VARCHAR,
`logType` VARCHAR,
`eventType` VARCHAR,
`subType` VARCHAR,
`operateType` VARCHAR,
`operateTag` VARCHAR,
`weight` INT,
`operator` VARCHAR,
`authRoles` VARCHAR,
`sourceHost` VARCHAR,
`restUri` VARCHAR,
`restMethod` VARCHAR,
`operateObj` VARCHAR,
`operateResult` VARCHAR,
`requestParams` VARCHAR,
`triggerCondition` VARCHAR,
`authType` VARCHAR,
`dataSize` INT,
`exceptionMsg` VARCHAR,
ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'),
WATERMARK FOR ts AS ts - INTERVAL '10'second
) WITH (
'connector' = 'kafka',
'format' = 'json',
'properties.bootstrap.servers' = '10.192.78.27:9092',
'scan.startup.mode' = 'latest-offset',
'topic' = 'logaudit_yf20220304',
'properties.group.id' = 'groupId_22'
)
CREATE TABLE sink22 (
`id` VARCHAR,
`rule_key` VARCHAR,
`rule_name` VARCHAR,
`metric_threshold` INT,
`audit_status` INT,
`audit_comment_num` INT,
`window_start` TIMESTAMP(3),
`window_end` TIMESTAMP(3),
`metric_count` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://10.192.78.27:39200',
'index' = 'logaudit_rule_22'
)
INSERT INTO sink22
SELECT uuid() as id ,'22' as rule_key ,'4个or测试' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count
FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second))
WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c' or componentId='d' )
GROUP BY window_start,window_end
HAVING count(*) >2
实际的jar包核心代码如下:
public class AuditRuleJob {
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000));
env.enableCheckpointing(60000);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:cancel后会保留checkpoint数据,恢复时可以恢复到指定的ck
//CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:cancel后会删除checkpoint数据,只有执行失败的时候才会保存ck
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
if (!params.has("source") || !params.has("sink") || !params.has("transform")) {
throw new RuntimeException("source or sink or transform sql parameter missing");
}
String sourceBase64 = params.get("source");
String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
String sinkBase64 = params.get("sink");
String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
String transformBase64 = params.get("transform");
String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8);
tableEnv.executeSql(source);
tableEnv.executeSql(sink);
tableEnv.executeSql(transform);
}
}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)