You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com.INVALID> on 2021/10/12 03:26:52 UTC
flink-1.14 使用 kafkasource 产生watermark疑问
我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();
回复:flink-1.14 使用 kafkasource 产生watermark疑问
Posted by kcz <57...@qq.com.INVALID>.
如果我用了globalWindow,自己定义了trigger,是不是水印也失效了,我测试了下,水印没有起作用。测试数据的times字段一直保持不变.public class PathMonitorJob {
private static final String PATH = "path";
private static double THRESHOLD;
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
THRESHOLD = parameterTool.getDouble("threshold",1000d);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);
WindowedStream<JSONObject, String, GlobalWindow> windowedStream = ds.keyBy(value -> value.getString("vin")).window(GlobalWindows.create());
windowedStream.trigger(PurgingTrigger.of(DeltaTrigger.of(THRESHOLD
, (oldDataPoint, newDataPoint) -> newDataPoint.getDoubleValue(PATH) - oldDataPoint.getDoubleValue(PATH)
, TypeInformation.of(JSONObject.class).createSerializer(env.getConfig()))))
.process(new CountPathProcess()).print();
env.execute("PathMonitorJob");
}
}
------------------ 原始邮件 ------------------
发件人: "user-zh" <17610775726@163.com>;
发送时间: 2021年10月12日(星期二) 中午12:32
收件人: "user-zh@flink.apache.org"<user-zh@flink.apache.org>;
主题: 回复:flink-1.14 使用 kafkasource 产生watermark疑问
Hi
你说的是在有窗口计算的情况下,窗口触发的条件是 wm > window.end_time 但是你的代码里面没有用到窗口,所以跟 wm 其实没有什么关系,也就是说每来一条数据都会打印
Best
JasonLee
在2021年10月12日 11:26,kcz<573693104@qq.com.INVALID> 写道:
我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();
回复:flink-1.14 使用 kafkasource 产生watermark疑问
Posted by JasonLee <17...@163.com>.
Hi
你说的是在有窗口计算的情况下,窗口触发的条件是 wm > window.end_time 但是你的代码里面没有用到窗口,所以跟 wm 其实没有什么关系,也就是说每来一条数据都会打印
Best
JasonLee
在2021年10月12日 11:26,kcz<57...@qq.com.INVALID> 写道:
我的时间 times字段一直没有发生改变,还是触发了窗口计算,不应该是时间+20秒以后计算吗?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();