You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2022/05/03 21:10:33 UTC

Setting boundedness for legacy Hadoop sequence file sources

Hi all,

I’m converting several batch Flink workflows to streaming, with bounded sources.

Some of our sources are reading Hadoop sequence files via StreamExecutionEnvironment.createInput(HadoopInputFormat).

The problem is that StreamGraphGenerator.existsUnboundedSource is returning true, because the LegacySourceTransformation for this source says it’s CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the execution mode to batch.

The root cause is that StreamExecutionEnvironment.createInput() checks if the input format extends FileInputFormat, and only sets up a bounded source if it does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set to CONTINUOUS_UNBOUNDED, which is wrong.

This looks like a bug in StreamExecutionEnvironment.createInput(), though not sure how best to fix it. Relying on class checks feels brittle.

Regards,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch