You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "DavidLiu (Jira)" <ji...@apache.org> on 2022/07/30 13:59:00 UTC

[jira] [Commented] (FLINK-28111) flinksql use hivecatalog cause union all operation lost 'eventTime attribute'

    [ https://issues.apache.org/jira/browse/FLINK-28111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573268#comment-17573268 ] 

DavidLiu commented on FLINK-28111:
----------------------------------

This is not an issue, but rather, by design.

The timestamp sequence can not be guranteed when there are two timestamps in "union all", which will lost the time attribute.

"join" two stream tables with time attribute will have the similar results.

> flinksql use hivecatalog cause union all  operation lost  'eventTime attribute'
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-28111
>                 URL: https://issues.apache.org/jira/browse/FLINK-28111
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.4, 1.13.5, 1.14.4
>         Environment: flink 1.12.4
> hadoop 2.6.5
> hive 1.1.0
>            Reporter: yutao
>            Priority: Major
>
>  In my scenario , i have 2 topics  have same schema ; i register them  to  table and define eventtime.
> then create view use union all  2 table ,and use view  group by  tumble window ;
> but when set hivecatalog ,sql can not run ;just like this:
> Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
>  
>  *The complete code is as follows*
> {code:java}
> package com.unicom.test;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> /**
>  *
>  * @author yt
>  */
> public class DataGenAndPrintSink {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         EnvironmentSettings envSetting = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, envSetting);
>         String defaultDatabase = "dc_dw" ;
>         String catalogName = "dc_catalog";
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0");
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         tableEnv.executeSql(sourceDDL);
>         tableEnv.executeSql(sourceDDL_2);
>         String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1";
>         tableEnv.executeSql(view);
>         
>         String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)";
>         tableEnv.executeSql(sqlGroup).print();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)