You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 猫猫 <16...@qq.com> on 2019/12/06 09:52:02 UTC
[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?
我使用tableEnv.sqlUpdate(ddl);方式创建表
但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?
我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。
sql创表语句如下:
CREATE TABLE T_UserBehavior(
userId BIGINT,
itemId BIGINT,
categoryId BIGINT,
behavior VARCHAR,
optime BIGINT
) WITH (
'connector.type' = 'filesystem', -- required: specify to connector type
'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory
'format.type' = 'csv',
'format.fields.0.name' = 'userId', -- required: define the schema either by using type information
'format.fields.0.type' = 'BIGINT',
'format.fields.1.name' = 'itemId',
'format.fields.1.type' = 'BIGINT',
'format.fields.2.name' = 'categoryId',
'format.fields.2.type' = 'BIGINT',
'format.fields.3.name' = 'behavior',
'format.fields.3.type' = 'VARCHAR',
'format.fields.4.name' = 'optime',
'format.fields.4.type' = 'BIGINT'
);
Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?
Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi 猫猫:
在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。
[2] 中有使用的完整例子,FYI。
[1] https://issues.apache.org/jira/browse/FLINK-14320
[2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala
Best,
Jingsong Lee
------------------------------------------------------------------
From:猫猫 <16...@qq.com>
Send Time:2019年12月6日(星期五) 17:52
To:user-zh <us...@flink.apache.org>
Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?
我使用tableEnv.sqlUpdate(ddl);方式创建表
但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?
我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。
sql创表语句如下:
CREATE TABLE T_UserBehavior(
userId BIGINT,
itemId BIGINT,
categoryId BIGINT,
behavior VARCHAR,
optime BIGINT
) WITH (
'connector.type' = 'filesystem', -- required: specify to connector type
'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory
'format.type' = 'csv',
'format.fields.0.name' = 'userId', -- required: define the schema either by using type information
'format.fields.0.type' = 'BIGINT',
'format.fields.1.name' = 'itemId',
'format.fields.1.type' = 'BIGINT',
'format.fields.2.name' = 'categoryId',
'format.fields.2.type' = 'BIGINT',
'format.fields.3.name' = 'behavior',
'format.fields.3.type' = 'VARCHAR',
'format.fields.4.name' = 'optime',
'format.fields.4.type' = 'BIGINT'
);