You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com.INVALID> on 2021/10/12 03:26:52 UTC

flink-1.14 使用 kafkasource 产生watermark疑问

我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
        .setBootstrapServers("127.0.0.1:9092")
        .setTopics("user_behavior")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();


DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
                .<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
        .map(JSONObject::parseObject);ds.print();

回复:flink-1.14 使用 kafkasource 产生watermark疑问

Posted by kcz <57...@qq.com.INVALID>.
如果我用了globalWindow,自己定义了trigger,是不是水印也失效了,我测试了下,水印没有起作用。测试数据的times字段一直保持不变.public class PathMonitorJob {
    private static final String PATH = "path";
    private static double THRESHOLD;
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        THRESHOLD = parameterTool.getDouble("threshold",1000d);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("user_behavior")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();


        DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
                        .<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
                .map(JSONObject::parseObject);

        WindowedStream<JSONObject, String, GlobalWindow&gt; windowedStream = ds.keyBy(value -&gt; value.getString("vin")).window(GlobalWindows.create());
        windowedStream.trigger(PurgingTrigger.of(DeltaTrigger.of(THRESHOLD
                , (oldDataPoint, newDataPoint) -&gt; newDataPoint.getDoubleValue(PATH) - oldDataPoint.getDoubleValue(PATH)
                , TypeInformation.of(JSONObject.class).createSerializer(env.getConfig()))))
                .process(new CountPathProcess()).print();
        env.execute("PathMonitorJob");
    }
}




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <17610775726@163.com&gt;;
发送时间:&nbsp;2021年10月12日(星期二) 中午12:32
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:flink-1.14 使用  kafkasource 产生watermark疑问



Hi


你说的是在有窗口计算的情况下,窗口触发的条件是 wm &gt; window.end_time 但是你的代码里面没有用到窗口,所以跟 wm 其实没有什么关系,也就是说每来一条数据都会打印


Best
JasonLee


在2021年10月12日 11:26,kcz<573693104@qq.com.INVALID&gt; 写道:
我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&amp;gt; kafkaSource = KafkaSource.<String&amp;gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();


DataStream<JSONObject&amp;gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&amp;gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();

回复:flink-1.14 使用 kafkasource 产生watermark疑问

Posted by JasonLee <17...@163.com>.
Hi


你说的是在有窗口计算的情况下,窗口触发的条件是 wm > window.end_time 但是你的代码里面没有用到窗口,所以跟 wm 其实没有什么关系,也就是说每来一条数据都会打印


Best
JasonLee


在2021年10月12日 11:26,kcz<57...@qq.com.INVALID> 写道:
我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();


DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();