You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yutao (Jira)" <ji...@apache.org> on 2022/06/17 10:00:01 UTC
[jira] [Created] (FLINK-28111) flinksql use hivecatalog cause union all operation lost 'eventTime attribute'
yutao created FLINK-28111:
-----------------------------
Summary: flinksql use hivecatalog cause union all operation lost 'eventTime attribute'
Key: FLINK-28111
URL: https://issues.apache.org/jira/browse/FLINK-28111
Project: Flink
Issue Type: Bug
Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.13.5, 1.12.7, 1.13.3, 1.12.5, 1.12.4
Environment: flink 1.12.4
hadoop 2.6.5
hive 1.1.0
Reporter: yutao
In my scenario , i have 2 topics have same schema ; i register them to table and define eventtime.
then create view use union all 2 table ,and use view group by tumble window ;
but when set hivecatalog ,sql can not run ;just like this:
Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
*The complete code is as follows*
{code:java}
package com.unicom.test;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
*
* @author yt
*/
public class DataGenAndPrintSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings envSetting = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
String defaultDatabase = "dc_dw" ;
String catalogName = "dc_catalog";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0");
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
" -- 维度数据\n" +
" order_id STRING,\n" +
" -- 用户 id\n" +
" user_id BIGINT,\n" +
" -- 用户\n" +
" price BIGINT,\n" +
" -- 事件时间戳\n" +
" row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
" -- watermark 设置\n" +
" WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '10',\n" +
" 'fields.order_id.length' = '1',\n" +
" 'fields.user_id.min' = '1',\n" +
" 'fields.user_id.max' = '100000',\n" +
" 'fields.price.min' = '1',\n" +
" 'fields.price.max' = '100000'\n" +
")";
String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
" -- 维度数据\n" +
" order_id STRING,\n" +
" -- 用户 id\n" +
" user_id BIGINT,\n" +
" -- 用户\n" +
" price BIGINT,\n" +
" -- 事件时间戳\n" +
" row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
" -- watermark 设置\n" +
" WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '10',\n" +
" 'fields.order_id.length' = '1',\n" +
" 'fields.user_id.min' = '1',\n" +
" 'fields.user_id.max' = '100000',\n" +
" 'fields.price.min' = '1',\n" +
" 'fields.price.max' = '100000'\n" +
")";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sourceDDL_2);
String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1";
tableEnv.executeSql(view);
String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)";
tableEnv.executeSql(sqlGroup).print();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)