You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yutao (Jira)" <ji...@apache.org> on 2022/06/17 13:23:00 UTC

[jira] (FLINK-28111) flinksql use hivecatalog cause union all operation lost 'eventTime attribute'

    [ https://issues.apache.org/jira/browse/FLINK-28111 ]


    yutao deleted comment on FLINK-28111:
    -------------------------------

was (Author: chaojipaopao):
[~martijnvisser] 1.12.4 but i try flink1.13.5 and flink1.14.4 also have this problem

> flinksql use hivecatalog cause union all  operation lost  'eventTime attribute'
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-28111
>                 URL: https://issues.apache.org/jira/browse/FLINK-28111
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.4
>         Environment: flink 1.12.4
> hadoop 2.6.5
> hive 1.1.0
>            Reporter: yutao
>            Priority: Major
>
>  In my scenario , i have 2 topics  have same schema ; i register them  to  table and define eventtime.
> then create view use union all  2 table ,and use view  group by  tumble window ;
> but when set hivecatalog ,sql can not run ;just like this:
> Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
>  
>  *The complete code is as follows*
> {code:java}
> package com.unicom.test;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> /**
>  *
>  * @author yt
>  */
> public class DataGenAndPrintSink {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         EnvironmentSettings envSetting = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, envSetting);
>         String defaultDatabase = "dc_dw" ;
>         String catalogName = "dc_catalog";
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0");
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         tableEnv.executeSql(sourceDDL);
>         tableEnv.executeSql(sourceDDL_2);
>         String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1";
>         tableEnv.executeSql(view);
>         
>         String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)";
>         tableEnv.executeSql(sqlGroup).print();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)