You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ysnakie <ys...@hotmail.com> on 2020/05/19 05:47:44 UTC

Flink Dataset job submission very slow

I have many lzo files on HDFS in such path format:
/logs/{id}/{date}/xxx[1-100].lzo

  

 **/logs/a/ds=2018-01-01/xxx1.lzo**

 **/logs/b/ds=2018-01-01/xxx1.lzo**

 **...**

 **/logs/z/ds=2018-01-02/xxx1.lzo**

 **...**

 **/logs/z/ds=2020-05-01/xxx100.lzo**

 **  
**

I'am using Flink Dataset to read those files by a range of {date} and apply
some transformation. Since Flink official does not provide lzo inputformat so
I use HadoopInputFormat to implement this. I currently cannot find a good way
to give one fileglob path to include all files I need so I have to do it in a
loop.

  

for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {

    FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));

    for (FileStatus fileStatus: fileStatuses) {

        String path = fileStatus.getPath().toString();

        lzoFiles.add(path)

    }

}  

// ...

// I have to initialize the source like that

Job job = new Job(conf);

org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
String.join(",", lzoFiles));

lzoSets = env.createInput(HadoopInputs.createHadoopInput(

    new LzoTextInputFormat(),

    LongWritable.class,

    Text.class,

    job)).name("lzo source").map(x -> x.f1.toString());

  

However when I submit this job to Flink. If I have a relatively long date
range files to read, the job submission take so much time(10mins, even 20mins
and more, I have already increase akka.timeout and web.timeout) to be prepared
which I cannot accept this. It seems Flink take so much time to optimize the
execution plan. Is there and good approach to make my program prepared
quickly?

[ ](https://maas.mail.163.com/dashi-web-
extend/html/proSignature.html?ftlId=1&name=ysnakie&uid=ysnakie%40hotmail.com&iconUrl=https%3A%2F%2Fmail-
online.nosdn.127.net%2Fsmc8371a9788890d59e567ed336b96676b.jpg&items=%5B%22ysnakie%40hotmail.com%22%5D)

Shengnan


Re: Flink Dataset job submission very slow

Posted by Arvid Heise <ar...@ververica.com>.
Do you have any logs that could help us identify the issue? How many files
is a long date range?

In general, you could try out the same program with the DataStream API (use
StreamExecutionEnvironment#readFile [1] with PROCESS_ONCE to get a behavior
equivalent to batch). DataStreams are only slightly optimized. Since
DataSet API will be eventually deprecated, that is also the recommended way
if you don't need feature specific to DataSet API.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sources

On Tue, May 19, 2020 at 7:48 AM ysnakie <ys...@hotmail.com> wrote:

> I have many lzo files on HDFS in such path format:
> /logs/{id}/{date}/xxx[1-100].lzo
>
> */logs/a/ds=2018-01-01/xxx1.lzo*
> */logs/b/ds=2018-01-01/xxx1.lzo*
> *...*
> */logs/z/ds=2018-01-02/xxx1.lzo*
> *...*
> */logs/z/ds=2020-05-01/xxx100.lzo*
>
> I'am using Flink Dataset to read those files by a range of {date} and
> apply some transformation. Since Flink official does not provide lzo
> inputformat so I use HadoopInputFormat to implement this. I currently
> cannot find a good way to give one fileglob path to include all files I
> need so I have to do it in a loop.
>
> for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
>     FileStatus[] fileStatuses = fs.globStatus(new
> org.apache.hadoop.fs.Path("/logs/*" , "ds=" +
> date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
>     for (FileStatus fileStatus: fileStatuses) {
>         String path = fileStatus.getPath().toString();
>         lzoFiles.add(path)
>     }
> }
> // ...
> // I have to initialize the source like that
> Job job = new Job(conf);
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
> String.join(",", lzoFiles));
> lzoSets = env.createInput(HadoopInputs.createHadoopInput(
>     new LzoTextInputFormat(),
>     LongWritable.class,
>     Text.class,
>     job)).name("lzo source").map(x -> x.f1.toString());
>
> However when I submit this job to Flink. If I have a relatively long date
> range files to read, the job submission take so much time(10mins, even
> 20mins and more, I have already increase akka.timeout and web.timeout) to
> be prepared which I cannot accept this. It seems Flink take so much time to
> optimize the execution plan. Is there and good approach to make my program
> prepared quickly?
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ysnakie&uid=ysnakie%40hotmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmc8371a9788890d59e567ed336b96676b.jpg&items=%5B%22ysnakie%40hotmail.com%22%5D>
> Shengnan
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng