You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2016/10/15 20:11:00 UTC

"java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

Hi all,

I'm reading a large number of small files from HDFS in batch mode (about 20
directories, each directory contains about 3000 files, using
recursive.file.enumeration=true), and each time, at about 200 GB of
received data, my job fails with the following exception:

java.io.IOException: Error opening the Input Split
hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
        at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:693)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
        at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
        at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:984)
        at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642)
        at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
        at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
        at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
        at java.io.FilterInputStream.read(Unknown Source)
        at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
        at java.util.zip.CheckedInputStream.read(Unknown Source)
        at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
        at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
        at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
        at java.util.zip.GZIPInputStream.<init>(Unknown Source)
        at java.util.zip.GZIPInputStream.<init>(Unknown Source)
        at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
        at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
        at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
        at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
        ... 5 more

I checked the file each time and it exists and is healthy. Looking at the
taskmanager logs, I found the following exceptions which suggests it is
running out of connections:

2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
                - I/O error constructing remote block reader.
java.net.SocketException: No buffer space available (maximum connections
reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Unknown Source)
2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
                 - Failed to connect to /x.x.x.x:50010 for block, add to
deadNodes and continue. java.net.SocketException: No buffer space available
(maximum connections reached?): connect
java.net.SocketException: No buffer space available (maximum connections
reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Unknown Source)

I inspected the open connections, and found that a very large number of
connections are opened by the job and stuck on the CLOSE_WAIT status, which
I guess exhausted the ephemeral port space after some time.
I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
using a prallelism of 8. I got the same exception even with a job
paralellism set to 1. The same exception happened after upgrading to Flink
1.1.3 too.

Any idea what could be the root cause of the problem and how to solve it?
Thank you.

Best,
Yassine

Re: "java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

Posted by Stephan Ewen <se...@apache.org>.
Happy to hear it!

On Mon, Oct 17, 2016 at 9:31 AM, Yassine MARZOUGUI <
y.marzougui@mindlytix.com> wrote:

> That solved my problem, Thank you!
>
> Best,
> Yassine
>
> 2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Hi!
>>
>> Looks to me that this is the following problem: The Decompression Streams
>> did not properly forward the "close()" calls.
>>
>> It is in the lastest 1.2-SNAPSHOT, but did not make it into version
>> 1.1.3.
>> The fix is in that pull request: https://github.com/apache/flin
>> k/pull/2581
>>
>> I have pushed the fix into the latest 1.1-SNAPSHOT branch.
>>
>> If you get the code via "git clone -b release-1.1
>> https://github.com/apache/flink.git" you will get the code that is the
>> same as the 1.1.3 release, plus the patch to this problem.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
>> y.marzougui@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm reading a large number of small files from HDFS in batch mode (about
>>> 20 directories, each directory contains about 3000 files, using
>>> recursive.file.enumeration=true), and each time, at about 200 GB of
>>> received data, my job fails with the following exception:
>>>
>>> java.io.IOException: Error opening the Input Split
>>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
>>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>>> file=/filepath/filename.csv.gz
>>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:693)
>>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>>         at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>         at java.lang.Thread.run(Unknown Source)
>>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not
>>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>>> file=/filepath/filename.csv.gz
>>>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu
>>> tStream.java:984)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:642)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja
>>> va:934)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja
>>> va:735)
>>>         at java.io.FilterInputStream.read(Unknown Source)
>>>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>>         at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>>         ... 5 more
>>>
>>> I checked the file each time and it exists and is healthy. Looking at
>>> the taskmanager logs, I found the following exceptions which suggests it is
>>> running out of connections:
>>>
>>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>>>                     - I/O error constructing remote block reader.
>>> java.net.SocketException: No buffer space available (maximum connections
>>> reached?): connect
>>> at sun.nio.ch.Net.connect0(Native Method)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>>> thTimeout.java:192)
>>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.
>>> java:3436)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>>> eaderFactory.java:777)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>>> erFromTcp(BlockReaderFactory.java:694)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>>> actory.java:355)
>>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:673)
>>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>>> at java.io.FilterInputStream.read(Unknown Source)
>>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma
>>> t.java:99)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Unknown Source)
>>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                      - Failed to connect to /x.x.x.x:50010 for block, add
>>> to deadNodes and continue. java.net.SocketException: No buffer space
>>> available (maximum connections reached?): connect
>>> java.net.SocketException: No buffer space available (maximum connections
>>> reached?): connect
>>> at sun.nio.ch.Net.connect0(Native Method)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>>> thTimeout.java:192)
>>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.
>>> java:3436)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>>> eaderFactory.java:777)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>>> erFromTcp(BlockReaderFactory.java:694)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>>> actory.java:355)
>>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:673)
>>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>>> at java.io.FilterInputStream.read(Unknown Source)
>>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma
>>> t.java:99)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>> I inspected the open connections, and found that a very large number of
>>> connections are opened by the job and stuck on the CLOSE_WAIT status, which
>>> I guess exhausted the ephemeral port space after some time.
>>> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
>>> using a prallelism of 8. I got the same exception even with a job
>>> paralellism set to 1. The same exception happened after upgrading to Flink
>>> 1.1.3 too.
>>>
>>> Any idea what could be the root cause of the problem and how to solve it?
>>> Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>

Re: "java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
That solved my problem, Thank you!

Best,
Yassine

2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> Looks to me that this is the following problem: The Decompression Streams
> did not properly forward the "close()" calls.
>
> It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3.
> The fix is in that pull request: https://github.com/apache/flink/pull/2581
>
> I have pushed the fix into the latest 1.1-SNAPSHOT branch.
>
> If you get the code via "git clone -b release-1.1
> https://github.com/apache/flink.git" you will get the code that is the
> same as the 1.1.3 release, plus the patch to this problem.
>
> Greetings,
> Stephan
>
>
> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
> y.marzougui@mindlytix.com> wrote:
>
>> Hi all,
>>
>> I'm reading a large number of small files from HDFS in batch mode (about
>> 20 directories, each directory contains about 3000 files, using
>> recursive.file.enumeration=true), and each time, at about 200 GB of
>> received data, my job fails with the following exception:
>>
>> java.io.IOException: Error opening the Input Split
>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:693)
>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>>         at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not
>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu
>> tStream.java:984)
>>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:642)
>>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:934)
>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:735)
>>         at java.io.FilterInputStream.read(Unknown Source)
>>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>         at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>>         ... 5 more
>>
>> I checked the file each time and it exists and is healthy. Looking at the
>> taskmanager logs, I found the following exceptions which suggests it is
>> running out of connections:
>>
>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>>                     - I/O error constructing remote block reader.
>> java.net.SocketException: No buffer space available (maximum connections
>> reached?): connect
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>> thTimeout.java:192)
>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>> eaderFactory.java:777)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>> erFromTcp(BlockReaderFactory.java:694)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>> actory.java:355)
>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:673)
>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>> at java.io.FilterInputStream.read(Unknown Source)
>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Unknown Source)
>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>>                    - Failed to connect to /x.x.x.x:50010 for block, add to
>> deadNodes and continue. java.net.SocketException: No buffer space available
>> (maximum connections reached?): connect
>> java.net.SocketException: No buffer space available (maximum connections
>> reached?): connect
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>> thTimeout.java:192)
>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>> eaderFactory.java:777)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>> erFromTcp(BlockReaderFactory.java:694)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>> actory.java:355)
>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:673)
>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>> at java.io.FilterInputStream.read(Unknown Source)
>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Unknown Source)
>>
>> I inspected the open connections, and found that a very large number of
>> connections are opened by the job and stuck on the CLOSE_WAIT status, which
>> I guess exhausted the ephemeral port space after some time.
>> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
>> using a prallelism of 8. I got the same exception even with a job
>> paralellism set to 1. The same exception happened after upgrading to Flink
>> 1.1.3 too.
>>
>> Any idea what could be the root cause of the problem and how to solve it?
>> Thank you.
>>
>> Best,
>> Yassine
>>
>
>

Re: "java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Looks to me that this is the following problem: The Decompression Streams
did not properly forward the "close()" calls.

It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3.
The fix is in that pull request: https://github.com/apache/flink/pull/2581

I have pushed the fix into the latest 1.1-SNAPSHOT branch.

If you get the code via "git clone -b release-1.1
https://github.com/apache/flink.git" you will get the code that is the same
as the 1.1.3 release, plus the patch to this problem.

Greetings,
Stephan


On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
y.marzougui@mindlytix.com> wrote:

> Hi all,
>
> I'm reading a large number of small files from HDFS in batch mode (about
> 20 directories, each directory contains about 3000 files, using
> recursive.file.enumeration=true), and each time, at about 200 GB of
> received data, my job fails with the following exception:
>
> java.io.IOException: Error opening the Input Split
> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
> file=/filepath/filename.csv.gz
>         at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:693)
>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
>         at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
> block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
> file=/filepath/filename.csv.gz
>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(
> DFSInputStream.java:984)
>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:642)
>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(
> DFSInputStream.java:934)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(
> DFSInputStream.java:735)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>         at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
>         at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
>         at org.apache.flink.api.common.io.FileInputFormat.
> decorateInputStream(FileInputFormat.java:717)
>         at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
>         ... 5 more
>
> I checked the file each time and it exists and is healthy. Looking at the
> taskmanager logs, I found the following exceptions which suggests it is
> running out of connections:
>
> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>                   - I/O error constructing remote block reader.
> java.net.SocketException: No buffer space available (maximum connections
> reached?): connect
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at org.apache.hadoop.net.SocketIOWithTimeout.connect(
> SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(
> BlockReaderFactory.java:777)
> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(
> BlockReaderFactory.java:694)
> at org.apache.hadoop.hdfs.BlockReaderFactory.build(
> BlockReaderFactory.java:355)
> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:673)
> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
> at java.io.FilterInputStream.read(Unknown Source)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
> at java.util.zip.CheckedInputStream.read(Unknown Source)
> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
> at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(
> FileInputFormat.java:717)
> at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Unknown Source)
> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>                    - Failed to connect to /x.x.x.x:50010 for block, add to
> deadNodes and continue. java.net.SocketException: No buffer space available
> (maximum connections reached?): connect
> java.net.SocketException: No buffer space available (maximum connections
> reached?): connect
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at org.apache.hadoop.net.SocketIOWithTimeout.connect(
> SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(
> BlockReaderFactory.java:777)
> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(
> BlockReaderFactory.java:694)
> at org.apache.hadoop.hdfs.BlockReaderFactory.build(
> BlockReaderFactory.java:355)
> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:673)
> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
> at java.io.FilterInputStream.read(Unknown Source)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
> at java.util.zip.CheckedInputStream.read(Unknown Source)
> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
> at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(
> FileInputFormat.java:717)
> at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Unknown Source)
>
> I inspected the open connections, and found that a very large number of
> connections are opened by the job and stuck on the CLOSE_WAIT status, which
> I guess exhausted the ephemeral port space after some time.
> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
> using a prallelism of 8. I got the same exception even with a job
> paralellism set to 1. The same exception happened after upgrading to Flink
> 1.1.3 too.
>
> Any idea what could be the root cause of the problem and how to solve it?
> Thank you.
>
> Best,
> Yassine
>