You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by drewfranklin <dr...@126.com> on 2023/02/07 11:55:14 UTC

回复: Kafka 数据源无法实现基于事件时间的窗口聚合

Hi ,应该是Kafka 可能存在空闲分区,如果只是partition 数量少于并发数的话,并不会影响水位推进,只是会浪费资源。默认程序不指定并行度,使用电脑cpu 核数。


如果是table api 的话,可以添加如下参数解决,table.exec.source.idle-timeout


| |
飞雨
|
|
bigdata
drewfranklin@126.com
|


---- 回复的原邮件 ----
| 发件人 | Weihua Hu<hu...@gmail.com> |
| 发送日期 | 2023年02月7日 18:48 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re: Kafka 数据源无法实现基于事件时间的窗口聚合 |
Hi,

问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
可以尝试通过以下办法解决:
1. 将 source 并发控制为 1
2. 为 watermark 策略开始 idleness 处理,参考 [#1]

fromElement 数据源会强制指定并发为 1

[#1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources


Best,
Weihua


On Tue, Feb 7, 2023 at 1:31 PM wei_yuze <we...@qq.com.invalid> wrote:

您好!




我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
和 kafkaSource
。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。




public class WindowReduceTest2 {&nbsp; &nbsp; public static void
main(String[] args) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();


&nbsp; &nbsp; &nbsp; &nbsp; // 使用fromElement数据源
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; streamSource =
env.fromElements(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new
Event2("Alice", "./home", "2023-02-04 17:10:11"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Bob",
"./cart", "2023-02-04 17:10:12"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new
Event2("Alice", "./home", "2023-02-04 17:10:13"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new
Event2("Alice", "./home", "2023-02-04 17:10:15"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary",
"./home", "2023-02-04 17:10:16"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary",
"./home", "2023-02-04 17:10:16")
&nbsp; &nbsp; &nbsp; &nbsp; );


&nbsp; &nbsp; &nbsp; &nbsp; // 使用Kafka数据源
&nbsp; &nbsp; &nbsp; &nbsp; JsonDeserializationSchema<Event2&gt;
jsonFormat = new JsonDeserializationSchema<&gt;(Event2.class);
&nbsp; &nbsp; &nbsp; &nbsp; KafkaSource<Event2&gt; source =
KafkaSource.<Event2&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.setBootstrapServers(Config.KAFKA_BROKERS)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.setTopics(Config.KAFKA_TOPIC)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.setGroupId("my-group")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.setStartingOffsets(OffsetsInitializer.earliest())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.setValueOnlyDeserializer(jsonFormat)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; kafkaSource =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
&nbsp; &nbsp; &nbsp; &nbsp; kafkaSource.print();


&nbsp; &nbsp; &nbsp; &nbsp; // 生成watermark,从数据中提取时间作为事件时间
&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<Event2&gt;
watermarkedStream =
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2&gt;forBoundedOutOfOrderness(Duration.ZERO)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.withTimestampAssigner(new SerializableTimestampAssigner<Event2&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
@Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
public long extractTimestamp(Event2 element, long recordTimestamp) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; SimpleDateFormat simpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; Date date = null;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; date =
simpleDateFormat.parse(element.getTime());
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; } catch (ParseException e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException(e);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; long time = date.getTime();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; System.out.println(time);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; return time;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));


&nbsp; &nbsp; &nbsp; &nbsp; // 窗口聚合
&nbsp; &nbsp; &nbsp; &nbsp; watermarkedStream.map(new MapFunction<Event2,
Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
@Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
public Tuple2<String, Long&gt; map(Event2 value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; // 将数据转换成二元组,方便计算
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; return Tuple2.of(value.getUser(), 1L);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(r -&gt;
r.f0)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 设置滚动事件时间窗口
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .reduce(new
ReduceFunction<Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
@Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
public Tuple2<String, Long&gt; reduce(Tuple2<String, Long&gt; value1,
Tuple2<String, Long&gt; value2) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; // 定义累加规则,窗口闭合时,向下游发送累加结果
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; return Tuple2.of(value1.f0, value1.f1 + value2.f1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .print("Aggregated
stream");


&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
&nbsp; &nbsp; }
}






值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。



感谢您花时间查看这个问题!
Lucas