You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by cy <ca...@126.com> on 2021/12/16 08:25:54 UTC

unexpected result when using sql interval join ability

Hi
Flink 1.14.0 Scala 2.12


I'm using flink sql interval join ability, here is my table schema and sql


create table `queue_3_ads_ccops_perf_o_ebs_volume_capacity` ( `dtEventTime` timestamp(3), `dtEventTimeStamp` bigint, `sourceid` string, `cluster_name` string, `poolname` string, `storage_poolname` string, `usage` decimal(10, 4), `provisioned_size` decimal(10, 4), `startat` timestamp(3), `endat` timestamp(3), `vrespool_id` int, `uuid` string, `version` string, `localTime` timestamp(3), `cluster_id` int, `extend1` string, `extend2` string, `extend3` string, `mon_ip` string, `bussiness_ip` string, `datasource` string, `thedate` int, `name` string, `used_size` int, watermark for `startat` as `startat` - interval '60' minutes ) with ( 'connector' = 'kafka', 'topic' = 'queue_3_ads_ccops_perf_o_ebs_volume_capacity', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '???????????', 'properties.group.id' = 'layer-vdisk', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-512', 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="????????" password="?????????";' );


SELECT 
source.sourceid AS sourceid, 
cast(source.startat AS timestamp) AS source_startat,
cast(target.startat AS timestamp) AS target_startat, 
source.used_size AS source_used_size, 
target.used_size AS target_used_size, 
source.usage AS source_usage,
target.usage AS target_usage 
FROM queue_3_ads_ccops_perf_o_ebs_volume_capacity source, queue_3_ads_ccops_perf_o_ebs_volume_capacity target
WHERE source.sourceid = target.sourceid
AND source.sourceid in (
'volume-9dfed0d9-28b2-418a-9215-ce762ef80920', 
'volume-9ece34f1-f4bb-475a-8e64-a2e37711b4fc', 
'volume-9f0ec4cc-5cc4-49a8-b715-a91a25df3793', 
'volume-9f38e0b3-2324-4505-a8ad-9b1ccb72181f', 
'volume-9f3ec256-10fb-4d8b-a8cb-8498324cf309'
)
AND source.startat >= FLOOR(target.startat TO HOUR) + INTERVAL '1' HOUR AND source.startat < FLOOR(target.startat TO HOUR) + INTERVAL '2' HOUR;


and result


I'm confused about first row that source_startat and target_startat was not matched the time condition. 
Also I try to execute the sql below


SELECT TO_TIMESTAMP('2021-12-13 14:05:06') >= FLOOR(TO_TIMESTAMP('2021-12-13 12:05:08') TO HOUR) + INTERVAL '1' HOUR AND TO_TIMESTAMP('2021-12-13 14:05:06') < FLOOR(TO_TIMESTAMP('2021-12-13 12:05:08') TO HOUR) + INTERVAL '2' HOUR;


the result false is correct.


So is anything wrong with flink sql interval join?


Need your help, thank you.