You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by pietro <pi...@gmail.com> on 2015/03/02 19:27:07 UTC

Getting the name of a file in a directory

I am reading files from a directory with this statement:

/val text = env.readFile(new MyInputFormat(), "/path/to/input/dir/")/

/MyInputFormat/ extends /DelimitedInputFormat/ which extends
/DelimitedInputFormat<Record>/.

In the output Record, I need to add a field that stores the name (or full
path) of the file from which it has been read.

In order to do that, I wrote the following code in /MyInputFormat/ :

/
private final StringValue fname = new StringValue();
...
@Override
	public Record readRecord(Record reusable, byte[] bytes, int offset, int
numBytes) {//throws IOException {
				
		fname.setValue(this.getFilePath().toString());	
		reusable.setField(0, this.fname);
		return reusable;
	}
/

The code compiles and runs without problems, but in the output I always get:
//path/to/input/dir//
for every record!
What I wanted to have is:
//path/to/input/dir/file1.txt/
//path/to/input/dir/file3.txt/
/.../

Can anyone help me?



--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Getting-the-name-of-a-file-in-a-directory-tp801.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.

Re: Getting the name of a file in a directory

Posted by Stephan Ewen <se...@apache.org>.
Hi Pietro!

The path of an input format is the directory from which all its input is
read. It does not contain the specific paths of the contained files.

The specific path is part of the "FileInputSplit", which describes a
subtask of work (like a file, or a part of a file).
If you want the path of the file that a record came from, I would suggest
the following:

Override the "open(FileInputSplit)" method in your input format. This
method is called once for every file (or subregion of a file, usually many
megabytes in size). The FileInputSplit that is passed to the open() method
contains the path of the file where the next records come from.

You can get the path from the FileInputSplit, store it in a member variable
and then use it to attach it to all records.

Greetings,
Stephan

BTW: You are using the Record input format, which is part of the deprecated
Record API. We recommend to use the input formats based on
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
which work on other data types than the Record.


On Mon, Mar 2, 2015 at 7:27 PM, pietro <pi...@gmail.com> wrote:

> I am reading files from a directory with this statement:
>
> /val text = env.readFile(new MyInputFormat(), "/path/to/input/dir/")/
>
> /MyInputFormat/ extends /DelimitedInputFormat/ which extends
> /DelimitedInputFormat<Record>/.
>
> In the output Record, I need to add a field that stores the name (or full
> path) of the file from which it has been read.
>
> In order to do that, I wrote the following code in /MyInputFormat/ :
>
> /
> private final StringValue fname = new StringValue();
> ...
> @Override
>         public Record readRecord(Record reusable, byte[] bytes, int
> offset, int
> numBytes) {//throws IOException {
>
>                 fname.setValue(this.getFilePath().toString());
>                 reusable.setField(0, this.fname);
>                 return reusable;
>         }
> /
>
> The code compiles and runs without problems, but in the output I always
> get:
> //path/to/input/dir//
> for every record!
> What I wanted to have is:
> //path/to/input/dir/file1.txt/
> //path/to/input/dir/file3.txt/
> /.../
>
> Can anyone help me?
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Getting-the-name-of-a-file-in-a-directory-tp801.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>