You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reme Ajayi <re...@gmail.com> on 2023/02/16 14:19:56 UTC
DataStream Join produces no output and causes program crash
Hi,
I am trying to join two Kafka Data Streams from and output to another Kafka
topic, however my joined stream does not output any data. After some time,
my program crashes and runs out of memory, which I think is a result of the
join not working. My code doesn't throw any errors, but the joins don't
produce any output. My join logic is below, please suggest possible
solutions.P.S:
Things I have tried so far:
1. Increased task slots on the task manager
2. Added Watermarks to my Kafka sources
DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
.where(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("la_id").toString();
}
}).equalTo(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("id").toString();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
@Override
public Enhanced join(GenericRecord first, GenericRecord second)
throws Exception {
return new Enhanced(
Long.parseLong(first.get("c_at").toString()),
first.get("c_type").toString(),
first.get("id").toString(),
Integer.parseInt(first.get("d_cts").toString()),
Integer.parseInt(first.get("c_cts").toString()),
second.get("prov").toString(),
second.get("bb_S_T").toString(),
second.get("p_id").toString(),
second.get("s_ccurr").toString()
);
}
});
Re: DataStream Join produces no output and causes program crash
Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Reme,
The code you provided seems good to me. Maybe you can add some logs in the
getKey() and join() function for debug purpose to observe whether there was
any successfully joined record. By the way, the metrics in WebUI dashboard
might be of good help.
Best,
Shuiqiang
Reme Ajayi <re...@gmail.com> 于2023年2月16日周四 22:20写道:
> Hi,
> I am trying to join two Kafka Data Streams from and output to another
> Kafka topic, however my joined stream does not output any data. After some
> time, my program crashes and runs out of memory, which I think is a result
> of the join not working. My code doesn't throw any errors, but the joins
> don't produce any output. My join logic is below, please suggest possible
> solutions.P.S:
> Things I have tried so far:
>
> 1. Increased task slots on the task manager
> 2. Added Watermarks to my Kafka sources
>
> DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
> .where(new KeySelector<GenericRecord, String>() {
>
> @Override
> public String getKey(GenericRecord value) throws Exception {
> return value.get("la_id").toString();
>
> }
> }).equalTo(new KeySelector<GenericRecord, String>() {
>
> @Override
> public String getKey(GenericRecord value) throws Exception {
> return value.get("id").toString();
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
>
>
> @Override
> public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
> return new Enhanced(
> Long.parseLong(first.get("c_at").toString()),
> first.get("c_type").toString(),
> first.get("id").toString(),
> Integer.parseInt(first.get("d_cts").toString()),
> Integer.parseInt(first.get("c_cts").toString()),
> second.get("prov").toString(),
> second.get("bb_S_T").toString(),
> second.get("p_id").toString(),
> second.get("s_ccurr").toString()
> );
> }
> });
>
>