You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reme Ajayi <re...@gmail.com> on 2023/02/16 14:19:56 UTC

DataStream Join produces no output and causes program crash

Hi,
I am trying to join two Kafka Data Streams from and output to another Kafka
topic, however my joined stream does not output any data.  After some time,
my program crashes and runs out of memory, which I think is a result of the
join not working. My code doesn't throw any errors, but the joins don't
produce any output. My join logic is below, please suggest possible
solutions.P.S:
Things I have tried so far:

   1. Increased task slots on the task manager
   2. Added Watermarks to my Kafka sources

 DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
				.where(new KeySelector<GenericRecord, String>() {

					@Override
					public String getKey(GenericRecord value) throws Exception {
						return value.get("la_id").toString();

					}
				}).equalTo(new KeySelector<GenericRecord, String>() {

					@Override
					public String getKey(GenericRecord value) throws Exception {
						return value.get("id").toString();
					}
				}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
				.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {


					@Override
					public Enhanced join(GenericRecord first, GenericRecord second)
throws Exception {
						return new Enhanced(
								Long.parseLong(first.get("c_at").toString()),
								first.get("c_type").toString(),
								first.get("id").toString(),
								Integer.parseInt(first.get("d_cts").toString()),
								Integer.parseInt(first.get("c_cts").toString()),
								second.get("prov").toString(),
								second.get("bb_S_T").toString(),
								second.get("p_id").toString(),
								second.get("s_ccurr").toString()
						);
					}
				});

Re: DataStream Join produces no output and causes program crash

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Reme,

The code you provided seems good to me. Maybe you can add some logs in the
getKey() and join() function for debug purpose to observe whether there was
any successfully joined record. By the way, the metrics in WebUI dashboard
might be of good help.

Best,
Shuiqiang

Reme Ajayi <re...@gmail.com> 于2023年2月16日周四 22:20写道:

> Hi,
> I am trying to join two Kafka Data Streams from and output to another
> Kafka topic, however my joined stream does not output any data.  After some
> time, my program crashes and runs out of memory, which I think is a result
> of the join not working. My code doesn't throw any errors, but the joins
> don't produce any output. My join logic is below, please suggest possible
> solutions.P.S:
> Things I have tried so far:
>
>    1. Increased task slots on the task manager
>    2. Added Watermarks to my Kafka sources
>
>  DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
> 				.where(new KeySelector<GenericRecord, String>() {
>
> 					@Override
> 					public String getKey(GenericRecord value) throws Exception {
> 						return value.get("la_id").toString();
>
> 					}
> 				}).equalTo(new KeySelector<GenericRecord, String>() {
>
> 					@Override
> 					public String getKey(GenericRecord value) throws Exception {
> 						return value.get("id").toString();
> 					}
> 				}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
> 				.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
>
>
> 					@Override
> 					public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
> 						return new Enhanced(
> 								Long.parseLong(first.get("c_at").toString()),
> 								first.get("c_type").toString(),
> 								first.get("id").toString(),
> 								Integer.parseInt(first.get("d_cts").toString()),
> 								Integer.parseInt(first.get("c_cts").toString()),
> 								second.get("prov").toString(),
> 								second.get("bb_S_T").toString(),
> 								second.get("p_id").toString(),
> 								second.get("s_ccurr").toString()
> 						);
> 					}
> 				});
>
>