You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Darin Amos <da...@instacart.com> on 2022/06/22 20:06:03 UTC

ContinuousFileReaderOperator from Kafka Source (Scala, Flink 1.13.1)

Hi There!

I have a question regarding the ingestion of files from S3 that are not
motivated by the ContinuousFileMonitoringFunction. For a variety of reasons
I cannot use the ContinuousFileMonitoringFunction so I'd like to use a
Kafka source where a message contains both the path in S3, and the metadata
required to consume the S3 files.

It's easy enough to create a KafkaSource and an operator to generate the
FileSplits but it seems reusing the ContinuousFileReaderOperator is tricky.
This operator requires a reference to the MailboxExecutor that isn't
available in the Scala Stream API's.

I guess my question is, is it safe to re-create my own version of
the ContinuousFileReaderOperator in Scala without the MailboxExecutor?
Meaning, is that pattern even necessary anymore? Or is safe to convert to
the Java API's who's transform function can provide a reference to the
Mailbox executor?

Thanks!

Darin Amos