You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2022/08/16 02:36:56 UTC

How to include path information in data extracted from text files with FileSource

Hi all,

We’ve got time-stamped directories containing text files, stored in HDFS.

We can regularly get new files added, so we’re using a FileSource with a monitoring duration, so that it continuously picks up any new files.

The challenge is that we need to include the parent directory’s timestamp in the output, for doing time-window joins of this enrichment data with another stream.

Previously I could extend with the input format <https://stackoverflow.com/a/68764550/231762> to extract path information, and emit a Tuple2<LongWritable, Text>.

But with the new FileSource architecture, I’m really not sure if it’s possible, or if so, the right way to go about doing it.

I’ve wandered through the source code (FileSource, AbstractFileSource, SourceReader, FileSourceReader, FileSourceSplit, ad nauseam) but haven’t seen any happy path to making that all work.

There might be a way using some really ugly hacks to TextLineFormat, where it would reverse engineer the FSDataInputStream to try to find information about the original file, but feels very fragile.

Any suggestions?

Thanks!

— Ken


--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re:How to include path information in data extracted from text files with FileSource

Posted by Xuyang <xy...@163.com>.
Hi, can you try to use FileSystem connector with Metadata 'file.path'[1]? I'm not sure it is suitable for you.

[1] https://github.com/apache/flink/blob/d21ab7ba816bb98fd167eb5f714b8e7bb2edd687/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala#L414



At 2022-08-16 10:36:56, "Ken Krugler" <kk...@transpac.com> wrote:

Hi all,


We’ve got time-stamped directories containing text files, stored in HDFS.


We can regularly get new files added, so we’re using a FileSource with a monitoring duration, so that it continuously picks up any new files.


The challenge is that we need to include the parent directory’s timestamp in the output, for doing time-window joins of this enrichment data with another stream.


Previously I could extend with the input format to extract path information, and emit a Tuple2<LongWritable, Text>.


But with the new FileSource architecture, I’m really not sure if it’s possible, or if so, the right way to go about doing it.


I’ve wandered through the source code (FileSource, AbstractFileSource, SourceReader, FileSourceReader, FileSourceSplit, ad nauseam) but haven’t seen any happy path to making that all work.


There might be a way using some really ugly hacks to TextLineFormat, where it would reverse engineer the FSDataInputStream to try to find information about the original file, but feels very fragile.


Any suggestions?


Thanks!


— Ken




--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch