You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yutao (Jira)" <ji...@apache.org> on 2022/06/17 13:23:00 UTC
[jira] (FLINK-28111) flinksql use hivecatalog cause union all operation lost 'eventTime attribute'
[ https://issues.apache.org/jira/browse/FLINK-28111 ]
yutao deleted comment on FLINK-28111:
-------------------------------
was (Author: chaojipaopao):
[~martijnvisser] 1.12.4 but i try flink1.13.5 and flink1.14.4 also have this problem
> flinksql use hivecatalog cause union all operation lost 'eventTime attribute'
> -------------------------------------------------------------------------------
>
> Key: FLINK-28111
> URL: https://issues.apache.org/jira/browse/FLINK-28111
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 1.12.4
> Environment: flink 1.12.4
> hadoop 2.6.5
> hive 1.1.0
> Reporter: yutao
> Priority: Major
>
> In my scenario , i have 2 topics have same schema ; i register them to table and define eventtime.
> then create view use union all 2 table ,and use view group by tumble window ;
> but when set hivecatalog ,sql can not run ;just like this:
> Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
>
> *The complete code is as follows*
> {code:java}
> package com.unicom.test;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> /**
> *
> * @author yt
> */
> public class DataGenAndPrintSink {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings envSetting = EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
> String defaultDatabase = "dc_dw" ;
> String catalogName = "dc_catalog";
> HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0");
> tableEnv.registerCatalog(catalogName, hive);
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
> " -- 维度数据\n" +
> " order_id STRING,\n" +
> " -- 用户 id\n" +
> " user_id BIGINT,\n" +
> " -- 用户\n" +
> " price BIGINT,\n" +
> " -- 事件时间戳\n" +
> " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
> " -- watermark 设置\n" +
> " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'rows-per-second' = '10',\n" +
> " 'fields.order_id.length' = '1',\n" +
> " 'fields.user_id.min' = '1',\n" +
> " 'fields.user_id.max' = '100000',\n" +
> " 'fields.price.min' = '1',\n" +
> " 'fields.price.max' = '100000'\n" +
> ")";
> String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
> " -- 维度数据\n" +
> " order_id STRING,\n" +
> " -- 用户 id\n" +
> " user_id BIGINT,\n" +
> " -- 用户\n" +
> " price BIGINT,\n" +
> " -- 事件时间戳\n" +
> " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
> " -- watermark 设置\n" +
> " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'rows-per-second' = '10',\n" +
> " 'fields.order_id.length' = '1',\n" +
> " 'fields.user_id.min' = '1',\n" +
> " 'fields.user_id.max' = '100000',\n" +
> " 'fields.price.min' = '1',\n" +
> " 'fields.price.max' = '100000'\n" +
> ")";
> tableEnv.executeSql(sourceDDL);
> tableEnv.executeSql(sourceDDL_2);
> String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1";
> tableEnv.executeSql(view);
>
> String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)";
> tableEnv.executeSql(sqlGroup).print();
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)