You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赵旭晨 <jj...@163.com> on 2021/07/06 07:24:31 UTC
flinksql 流维join 问题
流表:
CREATE TABLE flink_doris_source (
reporttime STRING,
tenantcode STRING,
visitid STRING,
orderid STRING,
orderdetailid STRING,
paymentdetailid STRING,
etltype STRING,
ptime as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '10.26.255.82:9092',
'properties.group.id' = 'consumer-55',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
维表:
CREATE TABLE people (
`Id` int,
`Name` String,
`Sex` tinyint,
`Birth` timestamp,
`Etltype` String,
PRIMARY KEY (Id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
'table-name' = 'people',
'username'='root',
'password'='*******',
'lookup.cache.ttl'='10s',
'lookup.cache.max-rows'='20'
);
mysql ddl:
CREATE TABLE `people` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`Name` varchar(40) NOT NULL,
`Sex` tinyint(3) unsigned NOT NULL,
`Birth` datetime DEFAULT NULL,
`Etltype` varchar(40) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
--流维join
INSERT INTO flink_doris_sink select a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on a.tenantcode = b.Name;
维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
flink版本:flink-1.12.0-bin-scala_2.11.tgz mysql:5.7.32 mysql驱动:8.0.22
Re: flinksql 流维join 问题
Posted by Terry Wang <zj...@gmail.com>.
语句看起来是没有问题的,可以检查下数据是否能关联上
Best,
Terry Wang
> 2021年7月6日 下午3:24,赵旭晨 <jj...@163.com> 写道:
>
> 流表:
> CREATE TABLE flink_doris_source (
> reporttime STRING,
> tenantcode STRING,
> visitid STRING,
> orderid STRING,
> orderdetailid STRING,
> paymentdetailid STRING,
> etltype STRING,
> ptime as proctime()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'demo',
> 'properties.bootstrap.servers' = '10.26.255.82:9092',
> 'properties.group.id' = 'consumer-55',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> );
>
> 维表:
> CREATE TABLE people (
> `Id` int,
> `Name` String,
> `Sex` tinyint,
> `Birth` timestamp,
> `Etltype` String,
> PRIMARY KEY (Id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
> 'table-name' = 'people',
> 'username'='root',
> 'password'='*******',
> 'lookup.cache.ttl'='10s',
> 'lookup.cache.max-rows'='20'
> );
>
> mysql ddl:
> CREATE TABLE `people` (
> `Id` int(11) NOT NULL AUTO_INCREMENT,
> `Name` varchar(40) NOT NULL,
> `Sex` tinyint(3) unsigned NOT NULL,
> `Birth` datetime DEFAULT NULL,
> `Etltype` varchar(40) NOT NULL,
> PRIMARY KEY (`Id`)
> ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
>
>
> --流维join
> INSERT INTO flink_doris_sink select a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on a.tenantcode = b.Name;
>
>
>
> 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
> flink版本:flink-1.12.0-bin-scala_2.11.tgz mysql:5.7.32 mysql驱动:8.0.22
>
>
>
>