You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2020/07/29 08:35:30 UTC

使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

以下程序运行,控制台一直没有数据输出1. 程序package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataGenTest {

    public static void main(String[] args) {

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        String sourceTableDDL = "CREATE TABLE datagen ( " +
                " f_random INT, " +
                " f_random_str STRING, " +
                " ts AS localtimestamp, " +
                " WATERMARK FOR ts AS ts " +
                ") WITH ( " +
                " 'connector' = 'datagen', " +
                " 'rows-per-second'='20', " +
                " 'fields.f_random.min'='1', " +
                " 'fields.f_random.max'='10', " +
                " 'fields.f_random_str.length'='10' " +
                ")";

        bsTableEnv.executeSql(sourceTableDDL);

        bsTableEnv.executeSql("SELECT f_random, count(1) " +
                "FROM datagen " +
                "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

    }

}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ |    f_random |               EXPR$1 | +-------------+----------------------+

Re:使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

Posted by hailongwang <18...@163.com>.
Hi Asahi Lee:
 我在 master 上的 flink-sql-client 模块中建了一个类,复制你的代码控制台是有输出的,你使用的版本是什么的?
Best,
Hailong Wang




在 2020-07-29 15:35:30,"Asahi Lee" <97...@qq.com> 写道:
>以下程序运行,控制台一直没有数据输出1. 程序package kafka;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class DataGenTest {
>
>    public static void main(String[] args) {
>
>        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
>        String sourceTableDDL = "CREATE TABLE datagen ( " +
>                " f_random INT, " +
>                " f_random_str STRING, " +
>                " ts AS localtimestamp, " +
>                " WATERMARK FOR ts AS ts " +
>                ") WITH ( " +
>                " 'connector' = 'datagen', " +
>                " 'rows-per-second'='20', " +
>                " 'fields.f_random.min'='1', " +
>                " 'fields.f_random.max'='10', " +
>                " 'fields.f_random_str.length'='10' " +
>                ")";
>
>        bsTableEnv.executeSql(sourceTableDDL);
>
>        bsTableEnv.executeSql("SELECT f_random, count(1) " +
>                "FROM datagen " +
>                "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
>
>    }
>
>}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ |    f_random |               EXPR$1 | +-------------+----------------------+

Re: 使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

Posted by Leonard Xu <xb...@gmail.com>.
Hi

> 
>        bsTableEnv.executeSql("SELECT f_random, count(1) " +
>                "FROM datagen " +
>                "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 模式下必须设置checkpoint才能work,
你配置下checkpoint之后再试下,支持 At Least Once 的方法在1.12里应该会支持,支持后可以不用设置 checkpoint。

祝好
Leonard