You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lukas Kircher <lu...@uni-konstanz.de> on 2016/12/07 11:10:02 UTC

Recursive directory traversal with TextInputFormat

Hi all,

I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:

1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.

2) What is the easiest way to read compressed .csv files (.zip)?

Thanks for your help, cheers
Lukas


Re: Recursive directory traversal with TextInputFormat

Posted by Ufuk Celebi <uc...@apache.org>.
Looping in Kostas who recently worked on the continuous file inputs.

@Kostas: do you have an idea what's happening here?

– Ufuk

On 8 December 2016 at 08:43:32, Lukas Kircher (lukas.kircher@uni-konstanz.de) wrote:
> Hi Stefan,
>  
> thanks for your answer.
>  
> > I think there is a field in FileInputFormat (which TextInputFormat is subclassing)  
> that could serve your purpose if you override the default:
>  
> That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java  
> and call setNestedFileEnumeration(true), but once the stream is processed only the  
> content of the .csv file in the top-most folder is printed. The example is just a few lines  
> of self-contained code, see below. Does anybody have an idea?
>  
> Cheers,
> Lukas
>  
>  
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
>  
> import java.io.BufferedWriter;
> import java.io.File;
> import java.io.FileWriter;
>  
> public class ReadDirectorySSCCE {
> public static void main(String[] args) throws Exception {
> // create given dirs and add a .csv file to each one
> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
> for (String dir: dirs) {
> // create input file
> File tmpDir = new File(dir);
> if (!tmpDir.exists()) {
> tmpDir.mkdirs();
> }
> File tempFile = File.createTempFile("file", ".csv", tmpDir);
> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
> w.write("content of " + dir + "/file.csv");
> w.close();
> tempFile.deleteOnExit();
> }
> File root = new File("tmp");
>  
> TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));  
> inputFormat.setNestedFileEnumeration(true);
>  
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
> env.createInput(inputFormat).print();
> env.execute();
> }
>  
>  
> > On 7 Dec 2016, at 17:44, Stefan Richter wrote:
> >
> > Hi,
> >
> > I think there is a field in FileInputFormat (which TextInputFormat is subclassing)  
> that could serve your purpose if you override the default:
> >
> > /**
> > * The flag to specify whether recursive traversal of the input directory
> > * structure is enabled.
> > */
> > protected boolean enumerateNestedFiles = false;
> > As for compression, I think this class also provides a InflaterInputStreamFactory  
> to read compressed data.
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >:  
> >>
> >> Hi all,
> >>
> >> I am trying to read nested .csv files from a directory and want to switch from a custom  
> SourceFunction I implemented to the TextInputFormat. I have two questions:
> >>
> >> 1) Somehow only the file in the root directory is processed, nested files are skipped.  
> What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3  
> no matter if I run it via the IDE or submit the job to the standalone binary. The file input  
> splits are all there, yet they don't seem to be processed.
> >>
> >> 2) What is the easiest way to read compressed .csv files (.zip)?
> >>
> >> Thanks for your help, cheers
> >> Lukas
> >>
> >>  
> >
>  
>  


Re: Recursive directory traversal with TextInputFormat

Posted by Lukas Kircher <lu...@uni-konstanz.de>.
Hi Stefan,

thanks for your answer.

> I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java and call setNestedFileEnumeration(true), but once the stream is processed only the content of the .csv file in the top-most folder is printed. The example is just a few lines of self-contained code, see below. Does anybody have an idea?

Cheers,
Lukas


import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }


> On 7 Dec 2016, at 17:44, Stefan Richter <s....@data-artisans.com> wrote:
> 
> Hi,
> 
> I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:
> 
> /**
>  * The flag to specify whether recursive traversal of the input directory
>  * structure is enabled.
>  */
> protected boolean enumerateNestedFiles = false;
> As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data.
> 
> Best,
> Stefan
> 
>> Am 07.12.2016 um 12:10 schrieb Lukas Kircher <lukas.kircher@uni-konstanz.de <ma...@uni-konstanz.de>>:
>> 
>> Hi all,
>> 
>> I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:
>> 
>> 1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.
>> 
>> 2) What is the easiest way to read compressed .csv files (.zip)?
>> 
>> Thanks for your help, cheers
>> Lukas
>> 
>> <ReadDirectorySSCCE.java>
> 


Re: Recursive directory traversal with TextInputFormat

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

/**
 * The flag to specify whether recursive traversal of the input directory
 * structure is enabled.
 */
protected boolean enumerateNestedFiles = false;
As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data.

Best,
Stefan

> Am 07.12.2016 um 12:10 schrieb Lukas Kircher <lu...@uni-konstanz.de>:
> 
> Hi all,
> 
> I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:
> 
> 1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.
> 
> 2) What is the easiest way to read compressed .csv files (.zip)?
> 
> Thanks for your help, cheers
> Lukas
> 
> <ReadDirectorySSCCE.java>