You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by ZhaoShuKang <ch...@163.com> on 2023/05/25 00:51:27 UTC

关于Table API 或 SQL 如何设置水印的疑问?

各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 设置事件时间有三种方式:
1、在 DDL 中定义
2、在 DataStream 到 Table 转换时定义
3、使用 TableSource 定义
而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
以下是我的第一版代码
// flink 集成 hive
System.out.println("初始化Flink环境");
String hiveVersion = "3.1.2";
String catalogName = "myhive";
String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
        .inStreamingMode()
        .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

System.out.println("定义hive环境");
// 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);

// 将 HiveCatalog 设置为 session 的当前 catalog
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
// 设置 hive 并行度
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setInteger("table.exec.hive.infer-source-parallelism.max", sourceParallelism); // Default 1000

// 使用 HiveTableSource
System.out.println("定义查询条件");
// 定义查询条件
Table table = tableEnv
.from(catalogName + "." + databaseName + "." + tableName)
        .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY + "," + HOUR)
        .filter($(YEAR).isEqual(year))
        .filter($(MONTH).isEqual(startMonth))
        .filter($(DAY).isGreaterOrEqual(startDay))
        .filter($(HOUR).isGreaterOrEqual(startHour))
        .filter($(DAY).isLessOrEqual(endDay))
        .filter($(HOUR).isLessOrEqual(endHour));
tableEnv.createTemporaryView("myTable", table);

// Table 转 Stream,非常耗时
System.out.println("Table to Stream");
DataStream<Row> resultStream = tableEnv.toDataStream(table);
// 水印及窗口设置
System.out.println("水印及窗口");
resultStream
.assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((SerializableTimestampAssigner<Row>) (element, recordTimestamp) -> {
long datetime = 0;
try {
                        datetime = new SimpleDateFormat(DATEFORMAT)
                                .parse(element.getFieldAs(DATETIME).toString())
                                .getTime();
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
return datetime;
                }))
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
| |
ZhaoShuKang
|
|
chuckzhao95@163.com
|

Re: 关于Table API 或 SQL 如何设置水印的疑问?

Posted by yidan zhao <hi...@gmail.com>.
你在hive的catalog中定义表的时候就可以定义好event time,以及watermark呀。

ZhaoShuKang <ch...@163.com> 于2023年5月25日周四 08:53写道:
>
> 各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 设置事件时间有三种方式:
> 1、在 DDL 中定义
> 2、在 DataStream 到 Table 转换时定义
> 3、使用 TableSource 定义
> 而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
> 另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
> 以下是我的第一版代码
> // flink 集成 hive
> System.out.println("初始化Flink环境");
> String hiveVersion = "3.1.2";
> String catalogName = "myhive";
> String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
> String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
> EnvironmentSettings settings = EnvironmentSettings
> .newInstance()
>         .inStreamingMode()
>         .build();
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
> System.out.println("定义hive环境");
> // 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
> HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
> tableEnv.registerCatalog(catalogName, hive);
>
> // 将 HiveCatalog 设置为 session 的当前 catalog
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> // 设置 hive 并行度
> Configuration configuration = tableEnv.getConfig().getConfiguration();
> configuration.setInteger("table.exec.hive.infer-source-parallelism.max", sourceParallelism); // Default 1000
>
> // 使用 HiveTableSource
> System.out.println("定义查询条件");
> // 定义查询条件
> Table table = tableEnv
> .from(catalogName + "." + databaseName + "." + tableName)
>         .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY + "," + HOUR)
>         .filter($(YEAR).isEqual(year))
>         .filter($(MONTH).isEqual(startMonth))
>         .filter($(DAY).isGreaterOrEqual(startDay))
>         .filter($(HOUR).isGreaterOrEqual(startHour))
>         .filter($(DAY).isLessOrEqual(endDay))
>         .filter($(HOUR).isLessOrEqual(endHour));
> tableEnv.createTemporaryView("myTable", table);
>
> // Table 转 Stream,非常耗时
> System.out.println("Table to Stream");
> DataStream<Row> resultStream = tableEnv.toDataStream(table);
> // 水印及窗口设置
> System.out.println("水印及窗口");
> resultStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
>                 .withTimestampAssigner((SerializableTimestampAssigner<Row>) (element, recordTimestamp) -> {
> long datetime = 0;
> try {
>                         datetime = new SimpleDateFormat(DATEFORMAT)
>                                 .parse(element.getFieldAs(DATETIME).toString())
>                                 .getTime();
>                     } catch (ParseException e) {
>                         e.printStackTrace();
>                     }
> return datetime;
>                 }))
>         .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
> | |
> ZhaoShuKang
> |
> |
> chuckzhao95@163.com
> |