You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2020/07/29 08:35:30 UTC
使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印
以下程序运行,控制台一直没有数据输出1. 程序package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DataGenTest {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String sourceTableDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='20', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='10', " +
" 'fields.f_random_str.length'='10' " +
")";
bsTableEnv.executeSql(sourceTableDDL);
bsTableEnv.executeSql("SELECT f_random, count(1) " +
"FROM datagen " +
"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
}
}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ | f_random | EXPR$1 | +-------------+----------------------+
Re:使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印
Posted by hailongwang <18...@163.com>.
Hi Asahi Lee:
我在 master 上的 flink-sql-client 模块中建了一个类,复制你的代码控制台是有输出的,你使用的版本是什么的?
Best,
Hailong Wang
在 2020-07-29 15:35:30,"Asahi Lee" <97...@qq.com> 写道:
>以下程序运行,控制台一直没有数据输出1. 程序package kafka;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class DataGenTest {
>
> public static void main(String[] args) {
>
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> String sourceTableDDL = "CREATE TABLE datagen ( " +
> " f_random INT, " +
> " f_random_str STRING, " +
> " ts AS localtimestamp, " +
> " WATERMARK FOR ts AS ts " +
> ") WITH ( " +
> " 'connector' = 'datagen', " +
> " 'rows-per-second'='20', " +
> " 'fields.f_random.min'='1', " +
> " 'fields.f_random.max'='10', " +
> " 'fields.f_random_str.length'='10' " +
> ")";
>
> bsTableEnv.executeSql(sourceTableDDL);
>
> bsTableEnv.executeSql("SELECT f_random, count(1) " +
> "FROM datagen " +
> "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
>
> }
>
>}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ | f_random | EXPR$1 | +-------------+----------------------+
Re: 使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印
Posted by Leonard Xu <xb...@gmail.com>.
Hi
>
> bsTableEnv.executeSql("SELECT f_random, count(1) " +
> "FROM datagen " +
> "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 模式下必须设置checkpoint才能work,
你配置下checkpoint之后再试下,支持 At Least Once 的方法在1.12里应该会支持,支持后可以不用设置 checkpoint。
祝好
Leonard