You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 猫猫 <16...@qq.com> on 2019/12/06 09:52:02 UTC

[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

我使用tableEnv.sqlUpdate(ddl);方式创建表


但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?


我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。


sql创表语句如下:
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',               -- required: specify to connector type
  'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId',         -- required: define the schema either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);

Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi 猫猫:

在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。

[2] 中有使用的完整例子,FYI。

[1] https://issues.apache.org/jira/browse/FLINK-14320
[2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala

Best,
Jingsong Lee


------------------------------------------------------------------
From:猫猫 <16...@qq.com>
Send Time:2019年12月6日(星期五) 17:52
To:user-zh <us...@flink.apache.org>
Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

我使用tableEnv.sqlUpdate(ddl);方式创建表


但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?


我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。


sql创表语句如下:
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',               -- required: specify to connector type
  'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId',         -- required: define the schema either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);