You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赵旭晨 <jj...@163.com> on 2021/07/06 07:24:31 UTC

flinksql 流维join 问题

流表:
CREATE TABLE flink_doris_source (
    reporttime STRING,
    tenantcode STRING,
    visitid STRING,
    orderid STRING,
orderdetailid STRING,
paymentdetailid STRING,
etltype STRING,
ptime as proctime()
) WITH (
 'connector' = 'kafka',  
 'topic' = 'demo', 
 'properties.bootstrap.servers' = '10.26.255.82:9092', 
 'properties.group.id' = 'consumer-55',  
 'format' = 'json',  
 'scan.startup.mode' = 'latest-offset'  
);


维表:
CREATE TABLE people (
  `Id` int,
  `Name` String,
  `Sex` tinyint,
  `Birth` timestamp,
  `Etltype` String,
  PRIMARY KEY (Id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
   'table-name' = 'people',
   'username'='root',
   'password'='*******',
   'lookup.cache.ttl'='10s',
   'lookup.cache.max-rows'='20'
);


mysql ddl:
CREATE TABLE `people` (
  `Id` int(11) NOT NULL AUTO_INCREMENT,
  `Name` varchar(40) NOT NULL,
  `Sex` tinyint(3) unsigned NOT NULL,
  `Birth` datetime DEFAULT NULL,
  `Etltype` varchar(40) NOT NULL,
  PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4




--流维join
INSERT INTO flink_doris_sink select a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on a.tenantcode = b.Name;




维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22


Re: flinksql 流维join 问题

Posted by Terry Wang <zj...@gmail.com>.
语句看起来是没有问题的,可以检查下数据是否能关联上 

Best,
Terry Wang



> 2021年7月6日 下午3:24,赵旭晨 <jj...@163.com> 写道:
> 
> 流表:
> CREATE TABLE flink_doris_source (
>     reporttime STRING,
>     tenantcode STRING,
>     visitid STRING,
>     orderid STRING,
> 	orderdetailid STRING,
> 	paymentdetailid STRING,
> 	etltype STRING,
> 	ptime as proctime()
> ) WITH (
>  'connector' = 'kafka',  
>  'topic' = 'demo', 
>  'properties.bootstrap.servers' = '10.26.255.82:9092', 
>  'properties.group.id' = 'consumer-55',  
>  'format' = 'json',  
>  'scan.startup.mode' = 'latest-offset'  
> );
> 
> 维表:
> CREATE TABLE people (
>   `Id` int,
>   `Name` String,
>   `Sex` tinyint,
>   `Birth` timestamp,
>   `Etltype` String,
>   PRIMARY KEY (Id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
>    'table-name' = 'people',
>    'username'='root',
>    'password'='*******',
>    'lookup.cache.ttl'='10s',
>    'lookup.cache.max-rows'='20'
> );
> 
> mysql ddl:
> CREATE TABLE `people` (
>   `Id` int(11) NOT NULL AUTO_INCREMENT,
>   `Name` varchar(40) NOT NULL,
>   `Sex` tinyint(3) unsigned NOT NULL,
>   `Birth` datetime DEFAULT NULL,
>   `Etltype` varchar(40) NOT NULL,
>   PRIMARY KEY (`Id`)
> ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
> 
> 
> --流维join
> INSERT INTO flink_doris_sink select a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on a.tenantcode = b.Name;
> 
> 
> 
> 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
> flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22
> 
> 
> 
>