You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wei_yuze <we...@qq.com> on 2023/02/07 05:43:59 UTC

Unable to do event time window aggregation with Kafka source

Hello!




I was unable to do event time window aggregation with Kafka source, but had no problem with "fromElement" source. The code is attached as follow. The code has two data sources, named "streamSource" and "kafkaSource" respectively. The program works well with "streamSource", but not with "watermarkedStream".




public class WindowReduceTest2 {&nbsp; &nbsp; public static void main(String[] args) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


&nbsp; &nbsp; &nbsp; &nbsp; // 使用fromElement数据源
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; streamSource = env.fromElements(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", "./home", "2023-02-04 17:10:11"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Bob", "./cart", "2023-02-04 17:10:12"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", "./home", "2023-02-04 17:10:13"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", "./home", "2023-02-04 17:10:15"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary", "./home", "2023-02-04 17:10:16"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary", "./home", "2023-02-04 17:10:16")
&nbsp; &nbsp; &nbsp; &nbsp; );


&nbsp; &nbsp; &nbsp; &nbsp; // 使用Kafka数据源
&nbsp; &nbsp; &nbsp; &nbsp; JsonDeserializationSchema<Event2&gt; jsonFormat = new JsonDeserializationSchema<&gt;(Event2.class);
&nbsp; &nbsp; &nbsp; &nbsp; KafkaSource<Event2&gt; source = KafkaSource.<Event2&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setBootstrapServers(Config.KAFKA_BROKERS)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setTopics(Config.KAFKA_TOPIC)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setGroupId("my-group")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setStartingOffsets(OffsetsInitializer.earliest())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setValueOnlyDeserializer(jsonFormat)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
&nbsp; &nbsp; &nbsp; &nbsp; kafkaSource.print();


&nbsp; &nbsp; &nbsp; &nbsp; // 生成watermark,从数据中提取时间作为事件时间
&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<Event2&gt; watermarkedStream = kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2&gt;forBoundedOutOfOrderness(Duration.ZERO)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withTimestampAssigner(new SerializableTimestampAssigner<Event2&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractTimestamp(Event2 element, long recordTimestamp) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Date date = null;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; date = simpleDateFormat.parse(element.getTime());
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (ParseException e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException(e);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long time = date.getTime();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(time);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return time;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));


&nbsp; &nbsp; &nbsp; &nbsp; // 窗口聚合
&nbsp; &nbsp; &nbsp; &nbsp; watermarkedStream.map(new MapFunction<Event2, Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple2<String, Long&gt; map(Event2 value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 将数据转换成二元组,方便计算
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Tuple2.of(value.getUser(), 1L);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(r -&gt; r.f0)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 设置滚动事件时间窗口
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .window(TumblingEventTimeWindows.of(Time.seconds(5)))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .reduce(new ReduceFunction<Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple2<String, Long&gt; reduce(Tuple2<String, Long&gt; value1, Tuple2<String, Long&gt; value2) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 定义累加规则,窗口闭合时,向下游发送累加结果
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Tuple2.of(value1.f0, value1.f1 + value2.f1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .print("Aggregated stream");


&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
&nbsp; &nbsp; }
}






Notably, if TumblingEventTimeWindows was replaced with TumblingProcessingTimeWindows, the program works well even with "watermarkedStream"



Thanks for your time!

Lucas

Re: Unable to do event time window aggregation with Kafka source

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Lucas. 
What do you mean by saying "unable to do event time window aggregation with watermarkedStream"? 
What exception it will throw? 

Best regards, 
Yuxia 


发件人: "wei_yuze" <we...@qq.com> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期二, 2023年 2 月 07日 下午 1:43:59 
主题: Unable to do event time window aggregation with Kafka source 



Hello! 




I was unable to do event time window aggregation with Kafka source, but had no problem with "fromElement" source. The code is attached as follow. The code has two data sources, named "streamSource" and "kafkaSource" respectively. The program works well with "streamSource", but not with "watermarkedStream". 


public class WindowReduceTest2 { 
public static void main(String[] args) throws Exception { 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

// 使用fromElement数据源 
DataStreamSource<Event2> streamSource = env.fromElements( 
new Event2("Alice", "./home", "2023-02-04 17:10:11"), 
new Event2("Bob", "./cart", "2023-02-04 17:10:12"), 
new Event2("Alice", "./home", "2023-02-04 17:10:13"), 
new Event2("Alice", "./home", "2023-02-04 17:10:15"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16") 
); 

// 使用Kafka数据源 
JsonDeserializationSchema<Event2> jsonFormat = new JsonDeserializationSchema<>(Event2.class); 
KafkaSource<Event2> source = KafkaSource.<Event2>builder() 
.setBootstrapServers(Config.KAFKA_BROKERS) 
.setTopics(Config.KAFKA_TOPIC) 
.setGroupId("my-group") 
.setStartingOffsets(OffsetsInitializer.earliest()) 
.setValueOnlyDeserializer(jsonFormat) 
.build(); 
DataStreamSource<Event2> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); 
kafkaSource.print(); 

// 生成watermark,从数据中提取时间作为事件时间 
SingleOutputStreamOperator<Event2> watermarkedStream = kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO) 
.withTimestampAssigner(new SerializableTimestampAssigner<Event2>() { 
@Override 
public long extractTimestamp(Event2 element, long recordTimestamp) { 
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
Date date = null; 
try { 
date = simpleDateFormat.parse(element.getTime()); 
} catch (ParseException e) { 
throw new RuntimeException(e); 
} 
long time = date.getTime(); 
System.out.println(time); 
return time; 
} 
})); 

// 窗口聚合 
watermarkedStream.map(new MapFunction<Event2, Tuple2<String, Long>>() { 
@Override 
public Tuple2<String, Long> map(Event2 value) throws Exception { 
// 将数据转换成二元组,方便计算 
return Tuple2.of(value.getUser(), 1L); 
} 
}) 
.keyBy(r -> r.f0) 
// 设置滚动事件时间窗口 
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.reduce(new ReduceFunction<Tuple2<String, Long>>() { 
@Override 
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { 
// 定义累加规则,窗口闭合时,向下游发送累加结果 
return Tuple2.of(value1.f0, value1.f1 + value2.f1); 
} 
}) 
.print("Aggregated stream"); 

env.execute(); 
} 
} 



Notably, if TumblingEventTimeWindows was replaced with TumblingProcessingTimeWindows, the program works well even with "watermarkedStream" 

Thanks for your time! 
Lucas