You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sourigna Phetsarath <gn...@teamaol.com> on 2016/03/18 19:15:52 UTC

Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

All:

Flink Version 0.10.2

The number that I set for *taskmanager.network.numberOfBuffers* doesn't
seem to have any affect, even if I set it to a very high number. There
might be a race condition here where the upper bound is not enforced or
computer correctly.

java.io.IOException: Insufficient number of network buffers: required *320*,
but only *32* available. The total number of network buffers is currently
set to *36864*. You can increase this number by setting the configuration
key 'taskmanager.network.numberOfBuffers'.
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
at java.lang.Thread.run(Thread.java:745)

Has anyone else encountered this issue?

Thanks in advance for any help that anyone can provide.
-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

right now there is no way to sequentially execute the input tasks. Flink's
FileInputFormat does also not support multiple paths out of the box.
However, it is certainly possible to extend the FileInputFormat such that
this is possible.
You would need to override / extend the createInputSplits() method of the
InputFormat.

Best, Fabian

2016-03-19 18:39 GMT+01:00 Sourigna Phetsarath <gn...@teamaol.com>:

> Ufuk,
>
> I configured the a super high number:
>
>
> bin/flink run \
>     -m yarn-cluster \
>     -ynm "UMS ETL Flow - No Negative Links" \
>     -yjm 10240 \
>     -yn 10 \
>     -ytm 40960 \
>     -ys 32 \
>     -yD taskmanager.network.numberOfBuffers=*284800* \
>
> and there are still issues. :(
>
> I did however, narrow down the issue to:
>
>    - Using a large number of large Avro files on S3
>    - For high parallelism, there is a lot of pressure on S3, so I have to
>    tune down the parallelism for that task
>    - Also, I am processing a few inputs using code similar to:
>
> def readRequestData(env: ExecutionEnvironment, parameters: Configuration,
> requestDataPath: String): DataSet[(String, Long)] = {
>     val allPaths =
> requestDataPath.split(',').filter(!_.isEmpty()).toSet.toSeq
>     Logger.info(f"Using parallelism of ${parallelism}%d for loading
> request data.")
>     val allRequestsDS = allPaths.map { aPath =>
>       {
>         val trimmedPath = aPath.trim
>         Logger.info(s"Loading request data from '${trimmedPath}'")
>         val avroInputFormat = new AvroGenericRecordInputFormat(new
> Path(trimmedPath))
>         avroInputFormat.setUnsplittable(shouldSplit)
>         val requests = env.readFile(avroInputFormat,
> trimmedPath).withParameters(parameters).setParallelism(30)
>           (r.get("identifiers").asInstanceOf[Record].get("utid").toString,
>
> r.get("request_properties").asInstanceOf[Record].get("ip_address").asInstanceOf[Long])
>         }
>       }
>     }
> *return* env.union(allRequestsDS)
>
>
> This causes the read tasks to run concurrently. So, if not tuned down,
> resource usage will be high.
>
>
>    - Using splits when there are lots of big files also causes a lot of
>    resource usage for S3
>
>
> For now, I've gotten things to work.
>
> Is there any way to map allPaths and have the readFile tasks execute
> sequentially rather than concurrently or specify multiple paths for
> FileInputFormat using Flink?
>
> Thank you for your help.
>
> - Gna
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, Mar 19, 2016 at 10:09 AM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Can you please follow the formula from [1]. For high parallelism you have
>> to configure the number of buffers accordingly, using number of slots for
>> #cores and number of task managers for #machines.
>>
>> Does this help?
>>
>> – Ufuk
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>>
>> On Fri, Mar 18, 2016 at 9:38 PM, Sourigna Phetsarath <
>> gna.phetsarath@teamaol.com> wrote:
>>
>>> It's batch.
>>>
>>> Reading a few thousand Avro files from S3 with a parallelism of 320.
>>> It's just failing on the read right now and not making to the next
>>> operations.
>>>
>>> Here's a screenshot of the flink dashboard.  It usually fails in the 1st
>>> task.
>>>
>>> Cheers,
>>> Gna
>>>
>>> On Fri, Mar 18, 2016 at 4:03 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> Hey Souringa,
>>>>
>>>> could you provide some context about the program you are running?
>>>>
>>>> Is it batch or streaming? What is the parallelism? How many operators
>>>> are you running?
>>>>
>>>> Thanks for reporting the issue. I think we will figure it out once you
>>>> provide further information. :-)
>>>>
>>>> – Ufuk
>>>>
>>>> On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
>>>> gna.phetsarath@teamaol.com> wrote:
>>>>
>>>>> All:
>>>>>
>>>>> Flink Version 0.10.2
>>>>>
>>>>> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
>>>>> seem to have any affect, even if I set it to a very high number. There
>>>>> might be a race condition here where the upper bound is not enforced or
>>>>> computer correctly.
>>>>>
>>>>> java.io.IOException: Insufficient number of network buffers: required
>>>>> *320*, but only *32* available. The total number of network buffers
>>>>> is currently set to *36864*. You can increase this number by setting
>>>>> the configuration key 'taskmanager.network.numberOfBuffers'.
>>>>> at
>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Has anyone else encountered this issue?
>>>>>
>>>>> Thanks in advance for any help that anyone can provide.
>>>>> --
>>>>>
>>>>>
>>>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>>>> Applied Research Chapter
>>>>> 770 Broadway, 5th Floor, New York, NY 10003
>>>>> o: 212.402.4871 // m: 917.373.7363
>>>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>>>
>>>>> * <http://www.aolplatforms.com>*
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * <http://www.aolplatforms.com>*
>>>
>>
>>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>

Re: Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
Ufuk,

I configured the a super high number:


bin/flink run \
    -m yarn-cluster \
    -ynm "UMS ETL Flow - No Negative Links" \
    -yjm 10240 \
    -yn 10 \
    -ytm 40960 \
    -ys 32 \
    -yD taskmanager.network.numberOfBuffers=*284800* \

and there are still issues. :(

I did however, narrow down the issue to:

   - Using a large number of large Avro files on S3
   - For high parallelism, there is a lot of pressure on S3, so I have to
   tune down the parallelism for that task
   - Also, I am processing a few inputs using code similar to:

def readRequestData(env: ExecutionEnvironment, parameters: Configuration,
requestDataPath: String): DataSet[(String, Long)] = {
    val allPaths =
requestDataPath.split(',').filter(!_.isEmpty()).toSet.toSeq
    Logger.info(f"Using parallelism of ${parallelism}%d for loading request
data.")
    val allRequestsDS = allPaths.map { aPath =>
      {
        val trimmedPath = aPath.trim
        Logger.info(s"Loading request data from '${trimmedPath}'")
        val avroInputFormat = new AvroGenericRecordInputFormat(new
Path(trimmedPath))
        avroInputFormat.setUnsplittable(shouldSplit)
        val requests = env.readFile(avroInputFormat,
trimmedPath).withParameters(parameters).setParallelism(30)
          (r.get("identifiers").asInstanceOf[Record].get("utid").toString,

r.get("request_properties").asInstanceOf[Record].get("ip_address").asInstanceOf[Long])
        }
      }
    }
*return* env.union(allRequestsDS)


This causes the read tasks to run concurrently. So, if not tuned down,
resource usage will be high.


   - Using splits when there are lots of big files also causes a lot of
   resource usage for S3


For now, I've gotten things to work.

Is there any way to map allPaths and have the readFile tasks execute
sequentially rather than concurrently or specify multiple paths for
FileInputFormat using Flink?

Thank you for your help.

- Gna















































On Sat, Mar 19, 2016 at 10:09 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Can you please follow the formula from [1]. For high parallelism you have
> to configure the number of buffers accordingly, using number of slots for
> #cores and number of task managers for #machines.
>
> Does this help?
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>
> On Fri, Mar 18, 2016 at 9:38 PM, Sourigna Phetsarath <
> gna.phetsarath@teamaol.com> wrote:
>
>> It's batch.
>>
>> Reading a few thousand Avro files from S3 with a parallelism of 320.
>> It's just failing on the read right now and not making to the next
>> operations.
>>
>> Here's a screenshot of the flink dashboard.  It usually fails in the 1st
>> task.
>>
>> Cheers,
>> Gna
>>
>> On Fri, Mar 18, 2016 at 4:03 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Hey Souringa,
>>>
>>> could you provide some context about the program you are running?
>>>
>>> Is it batch or streaming? What is the parallelism? How many operators
>>> are you running?
>>>
>>> Thanks for reporting the issue. I think we will figure it out once you
>>> provide further information. :-)
>>>
>>> – Ufuk
>>>
>>> On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
>>> gna.phetsarath@teamaol.com> wrote:
>>>
>>>> All:
>>>>
>>>> Flink Version 0.10.2
>>>>
>>>> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
>>>> seem to have any affect, even if I set it to a very high number. There
>>>> might be a race condition here where the upper bound is not enforced or
>>>> computer correctly.
>>>>
>>>> java.io.IOException: Insufficient number of network buffers: required
>>>> *320*, but only *32* available. The total number of network buffers is
>>>> currently set to *36864*. You can increase this number by setting the
>>>> configuration key 'taskmanager.network.numberOfBuffers'.
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>>>> at
>>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Has anyone else encountered this issue?
>>>>
>>>> Thanks in advance for any help that anyone can provide.
>>>> --
>>>>
>>>>
>>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>>> Applied Research Chapter
>>>> 770 Broadway, 5th Floor, New York, NY 10003
>>>> o: 212.402.4871 // m: 917.373.7363
>>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>>
>>>> * <http://www.aolplatforms.com>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

Posted by Ufuk Celebi <uc...@apache.org>.
Can you please follow the formula from [1]. For high parallelism you have
to configure the number of buffers accordingly, using number of slots for
#cores and number of task managers for #machines.

Does this help?

– Ufuk

[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers

On Fri, Mar 18, 2016 at 9:38 PM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:

> It's batch.
>
> Reading a few thousand Avro files from S3 with a parallelism of 320.  It's
> just failing on the read right now and not making to the next operations.
>
> Here's a screenshot of the flink dashboard.  It usually fails in the 1st
> task.
>
> Cheers,
> Gna
>
> On Fri, Mar 18, 2016 at 4:03 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Souringa,
>>
>> could you provide some context about the program you are running?
>>
>> Is it batch or streaming? What is the parallelism? How many operators are
>> you running?
>>
>> Thanks for reporting the issue. I think we will figure it out once you
>> provide further information. :-)
>>
>> – Ufuk
>>
>> On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
>> gna.phetsarath@teamaol.com> wrote:
>>
>>> All:
>>>
>>> Flink Version 0.10.2
>>>
>>> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
>>> seem to have any affect, even if I set it to a very high number. There
>>> might be a race condition here where the upper bound is not enforced or
>>> computer correctly.
>>>
>>> java.io.IOException: Insufficient number of network buffers: required
>>> *320*, but only *32* available. The total number of network buffers is
>>> currently set to *36864*. You can increase this number by setting the
>>> configuration key 'taskmanager.network.numberOfBuffers'.
>>> at
>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>>> at
>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Has anyone else encountered this issue?
>>>
>>> Thanks in advance for any help that anyone can provide.
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * <http://www.aolplatforms.com>*
>>>
>>
>>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>

Re: Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

Posted by Sourigna Phetsarath <gn...@teamaol.com>.
It's batch.

Reading a few thousand Avro files from S3 with a parallelism of 320.  It's
just failing on the read right now and not making to the next operations.

Here's a screenshot of the flink dashboard.  It usually fails in the 1st
task.

Cheers,
Gna

On Fri, Mar 18, 2016 at 4:03 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Souringa,
>
> could you provide some context about the program you are running?
>
> Is it batch or streaming? What is the parallelism? How many operators are
> you running?
>
> Thanks for reporting the issue. I think we will figure it out once you
> provide further information. :-)
>
> – Ufuk
>
> On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
> gna.phetsarath@teamaol.com> wrote:
>
>> All:
>>
>> Flink Version 0.10.2
>>
>> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
>> seem to have any affect, even if I set it to a very high number. There
>> might be a race condition here where the upper bound is not enforced or
>> computer correctly.
>>
>> java.io.IOException: Insufficient number of network buffers: required
>> *320*, but only *32* available. The total number of network buffers is
>> currently set to *36864*. You can increase this number by setting the
>> configuration key 'taskmanager.network.numberOfBuffers'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Has anyone else encountered this issue?
>>
>> Thanks in advance for any help that anyone can provide.
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Setting taskmanager.network.numberOfBuffers does not seem to have an affect - Flink 0.10.2

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Souringa,

could you provide some context about the program you are running?

Is it batch or streaming? What is the parallelism? How many operators are
you running?

Thanks for reporting the issue. I think we will figure it out once you
provide further information. :-)

– Ufuk

On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
gna.phetsarath@teamaol.com> wrote:

> All:
>
> Flink Version 0.10.2
>
> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
> seem to have any affect, even if I set it to a very high number. There
> might be a race condition here where the upper bound is not enforced or
> computer correctly.
>
> java.io.IOException: Insufficient number of network buffers: required
> *320*, but only *32* available. The total number of network buffers is
> currently set to *36864*. You can increase this number by setting the
> configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
> at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
> at java.lang.Thread.run(Thread.java:745)
>
> Has anyone else encountered this issue?
>
> Thanks in advance for any help that anyone can provide.
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>