You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Darshan Singh <da...@gmail.com> on 2018/08/13 19:01:37 UTC

Limit on number of files to read for Dataset

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is
will there be any sort of issues if I have say millions of files to read to
create single dataset.

Thanks

Re: Limit on number of files to read for Dataset

Posted by Darshan Singh <da...@gmail.com>.
Thanks all for your responses. I am now much more clearer on this.

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Flink InputFormats generate their InputSplits sequentially on the
> JobManager.
> These splits are stored in the heap of the JM process and handed out to
> SourceTasks when they request them lazily.
> Split assignment is done by a InputSplitAssigner, that can be customized.
> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
> assign splits based on locality.
>
> I see three potential problems:
> 1) InputSplit generation might take a long while. The JM is blocked until
> splits are generated.
> 2) All InputSplits need to be stored on the JM heap. You might need to
> assign more memory to the JM process.
> 3) Split assignment might take a while depending on the complexity of the
> InputSplitAssigner. You can implement a custom assigner to make this more
> efficient (from an assignment point of view).
>
> Best, Fabian
>
> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jo...@gmail.com>:
>
>> It causes more overhead (processes etc) which might make it slower.
>> Furthermore if you have them stored on HDFS then the bottleneck is the
>> namenode which will have to answer millions of requests.
>> The latter point will change in future Hadoop versions with
>> http://ozone.hadoop.apache.org/
>>
>> On 13. Aug 2018, at 21:01, Darshan Singh <da...@gmail.com> wrote:
>>
>> Hi Guys,
>>
>> Is there a limit on number of files flink dataset can read? My question
>> is will there be any sort of issues if I have say millions of files to read
>> to create single dataset.
>>
>> Thanks
>>
>>
>

Re: Limit on number of files to read for Dataset

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Darshan,

This looks like a file system configuration issue to me.
Flink supports different file systems for S3 and there are also a few
tuning knobs.

Did you have a look at the docs for file system configuration [1]?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/filesystems.html

2018-08-14 20:45 GMT+02:00 Darshan Singh <da...@gmail.com>:

> Thanks for the details. I got it working. I have around 1 directory for
> each month and I am running for 12-15 month data.So I created a dataset
> from each month and did a union.
>
> However, when I run I get the HTTP timeout issue. I am reading more than
> 120K files in total in all of months.
>
> I am using S3 and emr to do this and flink version is 1.4.2. When I run
> for 6 months this works fine.
>
> Below is part of error
>
> Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz
> [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
> pool
>     at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:705)
>     at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:477)
>     at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:48)
>     at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:145)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.handleRetryableException(
> AmazonHttpClient.java:1114)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeWithTimer(
> AmazonHttpClient.java:717)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutionBuilderImpl.
> execute(AmazonHttpClient.java:649)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.execute(AmazonHttpClient.java:513)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:22)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:9)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.
> GlobalS3Executor.execute(GlobalS3Executor.java:80)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> invoke(AmazonS3LiteClient.java:176)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> getObject(AmazonS3LiteClient.java:99)
>     at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:452)
>     at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:439)
>     at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:409)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invokeMethod(RetryInvocationHandler.java:163)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invoke(RetryInvocationHandler.java:155)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(
> RetryInvocationHandler.java:95)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:346)
>     at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
>     at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(
> S3NativeFileSystem.java:1213)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
>     at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(
> EmrFileSystem.java:166)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(
> HadoopFileSystem.java:119)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(
> HadoopFileSystem.java:36)
>     at org.apache.flink.api.common.io.FileInputFormat$
> InputSplitOpenThread.run(FileInputFormat.java:865)
> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.
> ConnectionPoolTimeoutException: Timeout waiting for connection from pool
>     at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingHttpClientConnectionManager.leaseConnection(
> PoolingHttpClientConnectionManager.java:286)
>     at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionMan
> ager.java:263)
>     at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.
> ClientConnectionRequestFactory$Handler.invoke(
> ClientConnectionRequestFactory.java:70)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown
> Source)
>
> Thanks
>
> On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink InputFormats generate their InputSplits sequentially on the
>> JobManager.
>> These splits are stored in the heap of the JM process and handed out to
>> SourceTasks when they request them lazily.
>> Split assignment is done by a InputSplitAssigner, that can be customized.
>> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
>> assign splits based on locality.
>>
>> I see three potential problems:
>> 1) InputSplit generation might take a long while. The JM is blocked until
>> splits are generated.
>> 2) All InputSplits need to be stored on the JM heap. You might need to
>> assign more memory to the JM process.
>> 3) Split assignment might take a while depending on the complexity of the
>> InputSplitAssigner. You can implement a custom assigner to make this more
>> efficient (from an assignment point of view).
>>
>> Best, Fabian
>>
>> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jo...@gmail.com>:
>>
>>> It causes more overhead (processes etc) which might make it slower.
>>> Furthermore if you have them stored on HDFS then the bottleneck is the
>>> namenode which will have to answer millions of requests.
>>> The latter point will change in future Hadoop versions with
>>> http://ozone.hadoop.apache.org/
>>>
>>> On 13. Aug 2018, at 21:01, Darshan Singh <da...@gmail.com> wrote:
>>>
>>> Hi Guys,
>>>
>>> Is there a limit on number of files flink dataset can read? My question
>>> is will there be any sort of issues if I have say millions of files to read
>>> to create single dataset.
>>>
>>> Thanks
>>>
>>>
>>
>

Re: Limit on number of files to read for Dataset

Posted by Darshan Singh <da...@gmail.com>.
Thanks for the details. I got it working. I have around 1 directory for
each month and I am running for 12-15 month data.So I created a dataset
from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than
120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for
6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz
[0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
pool
    at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
    at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
    at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at
org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
Timeout waiting for connection from pool
    at
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown
Source)

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Flink InputFormats generate their InputSplits sequentially on the
> JobManager.
> These splits are stored in the heap of the JM process and handed out to
> SourceTasks when they request them lazily.
> Split assignment is done by a InputSplitAssigner, that can be customized.
> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
> assign splits based on locality.
>
> I see three potential problems:
> 1) InputSplit generation might take a long while. The JM is blocked until
> splits are generated.
> 2) All InputSplits need to be stored on the JM heap. You might need to
> assign more memory to the JM process.
> 3) Split assignment might take a while depending on the complexity of the
> InputSplitAssigner. You can implement a custom assigner to make this more
> efficient (from an assignment point of view).
>
> Best, Fabian
>
> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jo...@gmail.com>:
>
>> It causes more overhead (processes etc) which might make it slower.
>> Furthermore if you have them stored on HDFS then the bottleneck is the
>> namenode which will have to answer millions of requests.
>> The latter point will change in future Hadoop versions with
>> http://ozone.hadoop.apache.org/
>>
>> On 13. Aug 2018, at 21:01, Darshan Singh <da...@gmail.com> wrote:
>>
>> Hi Guys,
>>
>> Is there a limit on number of files flink dataset can read? My question
>> is will there be any sort of issues if I have say millions of files to read
>> to create single dataset.
>>
>> Thanks
>>
>>
>

Re: Limit on number of files to read for Dataset

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink InputFormats generate their InputSplits sequentially on the
JobManager.
These splits are stored in the heap of the JM process and handed out to
SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized.
FileInputFormats typically use a LocatableInputSplitAssigner which tries to
assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until
splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to
assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the
InputSplitAssigner. You can implement a custom assigner to make this more
efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <jo...@gmail.com>:

> It causes more overhead (processes etc) which might make it slower.
> Furthermore if you have them stored on HDFS then the bottleneck is the
> namenode which will have to answer millions of requests.
> The latter point will change in future Hadoop versions with
> http://ozone.hadoop.apache.org/
>
> On 13. Aug 2018, at 21:01, Darshan Singh <da...@gmail.com> wrote:
>
> Hi Guys,
>
> Is there a limit on number of files flink dataset can read? My question is
> will there be any sort of issues if I have say millions of files to read to
> create single dataset.
>
> Thanks
>
>

Re: Limit on number of files to read for Dataset

Posted by Jörn Franke <jo...@gmail.com>.
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

> On 13. Aug 2018, at 21:01, Darshan Singh <da...@gmail.com> wrote:
> 
> Hi Guys,
> 
> Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.
> 
> Thanks

Re: Limit on number of files to read for Dataset

Posted by vino yang <ya...@gmail.com>.
Hi Darshan,

In a distributed scenario, there is no limit in theory, but there are still
some real-world conditions that will cause some constraints, such as the
size of your individual files, the memory size of your TM configuration,
and so on.
In addition, your "single" here is logical or physical, I mean, is it
physically multiple parallel source sub task instances?

Thanks, vino.

Darshan Singh <da...@gmail.com> 于2018年8月14日周二 上午3:01写道:

> Hi Guys,
>
> Is there a limit on number of files flink dataset can read? My question is
> will there be any sort of issues if I have say millions of files to read to
> create single dataset.
>
> Thanks
>