You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yutao (Jira)" <ji...@apache.org> on 2022/06/17 10:00:01 UTC

[jira] [Created] (FLINK-28111) flinksql use hivecatalog cause union all operation lost 'eventTime attribute'

yutao created FLINK-28111:
-----------------------------

             Summary: flinksql use hivecatalog cause union all  operation lost  'eventTime attribute'
                 Key: FLINK-28111
                 URL: https://issues.apache.org/jira/browse/FLINK-28111
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
    Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.13.5, 1.12.7, 1.13.3, 1.12.5, 1.12.4
         Environment: flink 1.12.4

hadoop 2.6.5

hive 1.1.0
            Reporter: yutao


 In my scenario , i have 2 topics  have same schema ; i register them  to  table and define eventtime.

then create view use union all  2 table ,and use view  group by  tumble window ;

but when set hivecatalog ,sql can not run ;just like this:

Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

 

 *The complete code is as follows*
{code:java}
package com.unicom.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 *
 * @author yt
 */
public class DataGenAndPrintSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings envSetting = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, envSetting);

        String defaultDatabase = "dc_dw" ;
        String catalogName = "dc_catalog";

        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0");

        tableEnv.registerCatalog(catalogName, hive);

        tableEnv.useCatalog(catalogName);

        tableEnv.useDatabase(defaultDatabase);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
                "    -- 维度数据\n" +
                "    order_id  STRING,\n" +
                "    -- 用户 id\n" +
                "    user_id BIGINT,\n" +
                "    -- 用户\n" +
                "    price BIGINT,\n" +
                "    -- 事件时间戳\n" +
                "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
                "    -- watermark 设置\n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second' = '10',\n" +
                "  'fields.order_id.length' = '1',\n" +
                "  'fields.user_id.min' = '1',\n" +
                "  'fields.user_id.max' = '100000',\n" +
                "  'fields.price.min' = '1',\n" +
                "  'fields.price.max' = '100000'\n" +
                ")";

        String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
                "    -- 维度数据\n" +
                "    order_id  STRING,\n" +
                "    -- 用户 id\n" +
                "    user_id BIGINT,\n" +
                "    -- 用户\n" +
                "    price BIGINT,\n" +
                "    -- 事件时间戳\n" +
                "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
                "    -- watermark 设置\n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second' = '10',\n" +
                "  'fields.order_id.length' = '1',\n" +
                "  'fields.user_id.min' = '1',\n" +
                "  'fields.user_id.max' = '100000',\n" +
                "  'fields.price.min' = '1',\n" +
                "  'fields.price.max' = '100000'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sourceDDL_2);

        String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1";

        tableEnv.executeSql(view);
        

        String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)";


        tableEnv.executeSql(sqlGroup).print();
    }

}
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)