You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by mars <sk...@yahoo.com> on 2020/09/25 21:24:30 UTC

SocketException: Too many open files

Hi,

  I have a simple Flink job which is reading the data from Kafka topic and
generating minute aggregations and writing them to Elastic Search.

  I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
runs for an hour fine and then it is getting stopped and when i checked the
logs i am seeing the following.

 Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
create Channel from class class
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
	at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
	at
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
	at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
	at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
	at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
	at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
	at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
	at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
	at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
	at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
	at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
	at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
	at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
	at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
	... 17 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
Failed to open a socket.
	at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
	at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
	at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
	... 21 more
Caused by: java.net.SocketException: Too many open files
	at sun.nio.ch.Net.socket0(Native Method)
	at sun.nio.ch.Net.socket(Net.java:411)
	at sun.nio.ch.Net.socket(Net.java:404)
	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
	at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
	at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
	... 23 more

Along with calling the Sink Function where i am writing the data to elastic
i am calling the print() on SingleOutputStreamOperator (the Stream that is
returned once i calculate the Aggregation based on Tumbling Window.

And also i am calling the DataStreamUtils.collect() on the above stream to
log out the info in the stream.

These two are only enabled for in DEV Environment.

I have updated the limits.conf and also set the value of file-max
(fs.file-max = 2097152) on the master node as well as on all worker nodes
and still getting the same issue.

Thanks
Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: SocketException: Too many open files

Posted by Arvid Heise <ar...@ververica.com>.
Hi Sateesh,

my suspicion would be that your custom Sink Function is leaking connections
(which also count for the file limit). Is there a reason that you cannot
use the ES connector of Flink?

I might have more ideas when you share your sink function.

Best,

Arvid

On Sun, Sep 27, 2020 at 7:16 PM mars <sk...@yahoo.com> wrote:

> Hi,
>
>  I am using 1.10.0 version of Flink on EMR.
>
>  I am not using the Default Flink Sink. I have a Sink Function on the
> Stream
> and with in the invoke function i am creating a Data Structure (VO) and
> putting it in the Map.
>
>  The EMR Step function i am running is. a Spring based FLink Job and i have
> a scheduler which runs every min and looks for items in the Map and
> generates JSON based in the VO from the Map and send it to Elastic Search
> and removes it from the HashMap once it is sent to ES successfully.
>
>  I am using M5.2x large for worker nodes and M5.4xlarge for Master Node
>
>  I have set the ulimit to 500K for all users (*) . Both soft and hard limit
> on Master and worker nodes.
>
>  Thanks again for your response.
>
> Sateesh
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: SocketException: Too many open files

Posted by mars <sk...@yahoo.com>.
Hi,

 I am using 1.10.0 version of Flink on EMR.

 I am not using the Default Flink Sink. I have a Sink Function on the Stream
and with in the invoke function i am creating a Data Structure (VO) and
putting it in the Map.

 The EMR Step function i am running is. a Spring based FLink Job and i have
a scheduler which runs every min and looks for items in the Map and
generates JSON based in the VO from the Map and send it to Elastic Search
and removes it from the HashMap once it is sent to ES successfully.

 I am using M5.2x large for worker nodes and M5.4xlarge for Master Node

 I have set the ulimit to 500K for all users (*) . Both soft and hard limit
on Master and worker nodes.

 Thanks again for your response.

Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: SocketException: Too many open files

Posted by Ken Krugler <kk...@transpac.com>.
Hi Mars,

A few questions..

1. What version of Flink are you using?

2. Are you using the default ES sink, or did you write your own?

3. What class of EC2 slave are you using?

4. What’s the parallelism of the ES sink?

5. To verify the actual open file limit, you need to…

 * scp your private key to the EMR master
 * ssh onto the EMR master
 * ssh (using the same key) from the master to one of the EMR slaves
 * sudo -u yarn bash -c ‘ulimit -a’

For many classes of EC2 servers, you get 32K max open files.

I haven’t looked at exactly how the ES sink configures things, but the ES REST client (by default) has a connection pool of size 30, and these connections aren’t closed immediately.

Each connection uses 3 file descriptors (one a_inode, two FIFO). So you could get about 100 open files per sink sub-task.

— Ken

PS - calling collect() or print() on a data stream would only make sense if it was tiny. Can you use PrintSinkFunction()?

> On Sep 25, 2020, at 2:24 PM, mars <sk...@yahoo.com> wrote:
> 
> Hi,
> 
>  I have a simple Flink job which is reading the data from Kafka topic and
> generating minute aggregations and writing them to Elastic Search.
> 
>  I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
> runs for an hour fine and then it is getting stopped and when i checked the
> logs i am seeing the following.
> 
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
> create Channel from class class
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
> 	at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
> 	at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
> 	at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
> 	at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
> 	at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
> 	at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
> 	at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
> 	at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
> 	at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> 	at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> 	at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 	at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> 	at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
> 	at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> 	at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> Caused by: java.lang.reflect.InvocationTargetException
> 	at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source)
> 	at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
> 	... 17 more
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
> Failed to open a socket.
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
> 	... 21 more
> Caused by: java.net.SocketException: Too many open files
> 	at sun.nio.ch.Net.socket0(Native Method)
> 	at sun.nio.ch.Net.socket(Net.java:411)
> 	at sun.nio.ch.Net.socket(Net.java:404)
> 	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> 	at
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
> 	at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
> 	... 23 more
> 
> Along with calling the Sink Function where i am writing the data to elastic
> i am calling the print() on SingleOutputStreamOperator (the Stream that is
> returned once i calculate the Aggregation based on Tumbling Window.
> 
> And also i am calling the DataStreamUtils.collect() on the above stream to
> log out the info in the stream.
> 
> These two are only enabled for in DEV Environment.
> 
> I have updated the limits.conf and also set the value of file-max
> (fs.file-max = 2097152) on the master node as well as on all worker nodes
> and still getting the same issue.
> 
> Thanks
> Sateesh
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr