You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maciek Próchniak <mp...@touk.pl> on 2016/04/20 15:45:25 UTC

Threads waiting on LocalBufferPool

Hi,
I'm running my flink job on one rather large machine (20 cores with 
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state 
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state 
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering 
job, so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for 
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000 
nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
    java.lang.Thread.State: TIMED_WAITING (on object monitor)
         at java.lang.Object.wait(Native Method)
         at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
         - locked <0x00000002eade3890> (a java.util.ArrayDeque)
         at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
         at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
         - locked <0x00000002eb73cbd0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
         at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
         at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
         at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
         at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
         at scala.collection.immutable.List.foreach(List.scala:381)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
         at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
         - locked <0x00000002eaf3eb50> (a java.lang.Object)
         at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
         - locked <0x00000002eaf3eb50> (a java.lang.Object)

This seems a bit weird for me, as most of state processing threads are idle:

"My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5 
os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition 
[0x00007f7bee8ed000]
    java.lang.Thread.State: TIMED_WAITING (parking)
         at sun.misc.Unsafe.park(Native Method)
         - parking to wait for  <0x00000002eb840c38> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
         at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
         at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
         at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
         at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
         at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
         at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
         at java.lang.Thread.run(Thread.java:745)


I tried with using more network buffers, but I doesn't seem to change 
anything - and if I understand correctly 
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers 
I should not need more than 24^2 * 4 of them...

Does anybody encountered such problem? Or maybe it's just normal for 
such case...

thanks,
maciek


Re: Threads waiting on LocalBufferPool

Posted by Maciek Próchniak <mp...@touk.pl>.

On 21/04/2016 16:46, Aljoscha Krettek wrote:
> Hi,
> I would be very happy about improvements to our RocksDB performance. 
> What are the RocksDB Java benchmarks that you are running? In Flink, 
> we also have to serialize/deserialize every time that we access 
> RocksDB using our TypeSerializer. Maybe this is causing the slow down.
>
Hi Aljoscha,

I'm using benchmark from:
https://github.com/facebook/rocksdb/blob/master/java/jdb_bench.sh

My value is pretty simple scala case class - around 12 fields with 
Int/Long/String values - I think serialization shouldn't be a big 
problem. However I think I'll have to do more comprehensive tests to be 
sure I'm comparing apples to apples - hope to find time during weekend 
for that :)

thanks,
maciek

> By the way, what is the type of value stored in the RocksDB state. 
> Maybe the TypeSerializer for that value is very slow.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:41 Maciek Próchniak <mpr@touk.pl 
> <ma...@touk.pl>> wrote:
>
>     Well...
>     I found some time to look at rocksDB performance.
>
>     It takes around 0.4ms to lookup value state and 0.12ms to update -
>     these are means, 95th percentile was > 1ms for get... When I set
>     additional options:
>               .setIncreaseParallelism(8)
>               .setMaxOpenFiles(-1)
>     .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>
>     I manage to get
>     0.05ms for update and 0.2ms for get - but still it seems pretty
>     bad compared to standard rocksdb java benchmarks that I try on the
>     same machine, as they are:
>     fillseq          :     1.23238 micros/op;   89.8 MB/s; 1000000 ops
>     done;  1 / 1 task(s) finished.
>     readrandom       :     9.25380 micros/op;   12.0 MB/s; 1000000 /
>     1000000 found;  1 / 1 task(s) finished.
>     fillrandom       :     4.46839 micros/op;   24.8 MB/s; 1000000 ops
>     done;  1 / 1 task(s) finished.
>
>     guess I'll have to look at it a bit more...
>
>     thanks anyway,
>     maciek
>
>
>
>     On 21/04/2016 08:41, Maciek Próchniak wrote:
>>     Hi Ufuk,
>>
>>     thanks for quick reply.
>>     Actually I had a little time to try both things.
>>     1) helped only temporarily - it just took a bit longer to
>>     saturate the pool. After few minutes, periodically all kafka
>>     threads were waiting for bufferPool.
>>     2) This seemed to help. I also reduced checkpoint interval - on
>>     rocks we had 5min, now I tried 30s. .
>>
>>     I attach throughput metrics - the former (around 18) is with
>>     increased heap & buffers, the latter (around 22) is with
>>     FileSystemStateBackend.
>>     My state is few GB large - during the test it reached around
>>     2-3GB. I must admit I was quite impressed that checkpointing to
>>     HDFS using FileSystem took only about 6-7s (with occasional
>>     spikes to 12-13s, which can be seen on metrcs - didn't check if
>>     it was caused by hdfs or sth else).
>>
>>     Now I looked at logs from 18 and seems like checkpointing rocksdb
>>     took around 2-3minutes:
>>     2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
>>     org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>     Triggering checkpoint 6 @ 1461167253439
>>     2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147]
>>     INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>     Completed checkpoint 6 (in 140588 ms)
>>     - however I don't see any threads dumping state in threadStacks...
>>
>>     I guess I'll have to add some metrics around state invocations to
>>     see where is the problem with rocksDB... I'll write if I find
>>     anything, but that won't be today I think...
>>
>>     Btw - I was looking at FS state and I wonder would it be feasible
>>     to make variant of this state using immutable map (probably some
>>     scala one) to be able to do async checkpoints.
>>     Then synchronous part would be essentially free - just taking the
>>     state map and materializing it asynchronously.
>>     Of course, that would work only for immutable state - but this is
>>     often the case when writing in scala. WDYT?
>>
>>     thanks,
>>     maciek
>>
>>
>>
>>
>>     On 20/04/2016 16:28, Ufuk Celebi wrote:
>>>     Could be different things actually, including the parts of the
>>>     network
>>>     you mentioned.
>>>
>>>     1)
>>>
>>>     Regarding the TM config:
>>>     - It can help to increase the number of network buffers (you can go
>>>     ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
>>>     - In general, you have way more memory available than you actually
>>>     give to Flink. I would increase the 20 GB heap size.
>>>
>>>     As a first step you could address these two points and re-run
>>>     your job.
>>>
>>>     2)
>>>
>>>     As a follow-up you could also work with the FileSystemStateBackend,
>>>     which keeps state in memory (on-heap) and writes checkpoints to
>>>     files.
>>>     This would help in checking how much RocksDB is slowing things
>>>     down.
>>>
>>>
>>>     I'm curious about the results. Do you think you will have time
>>>     to try this?
>>>
>>>     – Ufuk
>>>
>>>
>>>     On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mp...@touk.pl>
>>>     <ma...@touk.pl> wrote:
>>>>     Hi,
>>>>     I'm running my flink job on one rather large machine (20 cores
>>>>     with
>>>>     hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
>>>>     It does more or less:
>>>>     read csv from kafka -> keyBy one of the fields -> some custom
>>>>     state
>>>>     processing.
>>>>     Kafka topic has 24 partitions, so my parallelism is also 24
>>>>
>>>>     After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB
>>>>     state
>>>>     backend) I reached a point when throughput is ~120-150k/s.
>>>>     One the same kafka and machine I reached > 500k/s with simple
>>>>     filtering job,
>>>>     so I wanted to see what's the bottleneck.
>>>>
>>>>     It turns out that quite often all of kafka threads are stuck
>>>>     waiting for
>>>>     buffer from pool:
>>>>     "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
>>>>     nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>>>>         java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>>              at java.lang.Object.wait(Native Method)
>>>>              at
>>>>     org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>>>
>>>>              - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>>>>              at
>>>>     org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>>>
>>>>              - locked <0x00000002eb73cbd0> (a
>>>>     org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>>>
>>>>              at
>>>>     scala.collection.immutable.List.foreach(List.scala:381)
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>>>
>>>>              - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>>              at
>>>>     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>>>
>>>>              - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>>
>>>>     This seems a bit weird for me, as most of state processing
>>>>     threads are idle:
>>>>
>>>>     "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353
>>>>     daemon prio=5
>>>>     os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
>>>>     [0x00007f7bee8ed000]
>>>>         java.lang.Thread.State: TIMED_WAITING (parking)
>>>>              at sun.misc.Unsafe.park(Native Method)
>>>>              - parking to wait for <0x00000002eb840c38> (a
>>>>     java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>
>>>>              at
>>>>     java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>>
>>>>              at
>>>>     java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>>
>>>>              at
>>>>     java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>              at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>     I tried with using more network buffers, but I doesn't seem to
>>>>     change
>>>>     anything - and if I understand correctly
>>>>     https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>>>>
>>>>     I should not need more than 24^2 * 4 of them...
>>>>
>>>>     Does anybody encountered such problem? Or maybe it's just
>>>>     normal for such
>>>>     case...
>>>>
>>>>     thanks,
>>>>     maciek
>>>>
>>
>


Re: Threads waiting on LocalBufferPool

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I would be very happy about improvements to our RocksDB performance. What
are the RocksDB Java benchmarks that you are running? In Flink, we also
have to serialize/deserialize every time that we access RocksDB using our
TypeSerializer. Maybe this is causing the slow down.

By the way, what is the type of value stored in the RocksDB state. Maybe
the TypeSerializer for that value is very slow.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:41 Maciek Próchniak <mp...@touk.pl> wrote:

> Well...
> I found some time to look at rocksDB performance.
>
> It takes around 0.4ms to lookup value state and 0.12ms to update - these
> are means, 95th percentile was > 1ms for get... When I set additional
> options:
>           .setIncreaseParallelism(8)
>           .setMaxOpenFiles(-1)
>           .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>
> I manage to get
> 0.05ms for update and 0.2ms for get - but still it seems pretty bad
> compared to standard rocksdb java benchmarks that I try on the same
> machine, as they are:
> fillseq          :     1.23238 micros/op;   89.8 MB/s; 1000000 ops done;
> 1 / 1 task(s) finished.
> readrandom       :     9.25380 micros/op;   12.0 MB/s; 1000000 / 1000000
> found;  1 / 1 task(s) finished.
> fillrandom       :     4.46839 micros/op;   24.8 MB/s; 1000000 ops done;
> 1 / 1 task(s) finished.
>
> guess I'll have to look at it a bit more...
>
> thanks anyway,
> maciek
>
>
>
> On 21/04/2016 08:41, Maciek Próchniak wrote:
>
> Hi Ufuk,
>
> thanks for quick reply.
> Actually I had a little time to try both things.
> 1) helped only temporarily - it just took a bit longer to saturate the
> pool. After few minutes, periodically all kafka threads were waiting for
> bufferPool.
> 2) This seemed to help. I also reduced checkpoint interval - on rocks we
> had 5min, now I tried 30s. .
>
> I attach throughput metrics - the former (around 18) is with increased
> heap & buffers, the latter (around 22) is with FileSystemStateBackend.
> My state is few GB large - during the test it reached around 2-3GB. I must
> admit I was quite impressed that checkpointing to HDFS using FileSystem
> took only about 6-7s (with occasional spikes to 12-13s, which can be seen
> on metrcs - didn't check if it was caused by hdfs or sth else).
>
> Now I looked at logs from 18 and seems like checkpointing rocksdb took
> around 2-3minutes:
> 2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 6 @ 1461167253439
> 2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 6 (in 140588 ms)
> - however I don't see any threads dumping state in threadStacks...
>
> I guess I'll have to add some metrics around state invocations to see
> where is the problem with rocksDB... I'll write if I find anything, but
> that won't be today I think...
>
> Btw - I was looking at FS state and I wonder would it be feasible to make
> variant of this state using immutable map (probably some scala one) to be
> able to do async checkpoints.
> Then synchronous part would be essentially free - just taking the state
> map and materializing it asynchronously.
> Of course, that would work only for immutable state - but this is often
> the case when writing in scala. WDYT?
>
> thanks,
> maciek
>
>
>
>
> On 20/04/2016 16:28, Ufuk Celebi wrote:
>
> Could be different things actually, including the parts of the network
> you mentioned.
>
> 1)
>
> Regarding the TM config:
> - It can help to increase the number of network buffers (you can go
> ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
> - In general, you have way more memory available than you actually
> give to Flink. I would increase the 20 GB heap size.
>
> As a first step you could address these two points and re-run your job.
>
> 2)
>
> As a follow-up you could also work with the FileSystemStateBackend,
> which keeps state in memory (on-heap) and writes checkpoints to files.
> This would help in checking how much RocksDB is slowing things down.
>
>
> I'm curious about the results. Do you think you will have time to try
> this?
>
> – Ufuk
>
>
> On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mp...@touk.pl>
> <mp...@touk.pl> wrote:
>
> Hi,
> I'm running my flink job on one rather large machine (20 cores with
> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
> It does more or less:
> read csv from kafka -> keyBy one of the fields -> some custom state
> processing.
> Kafka topic has 24 partitions, so my parallelism is also 24
>
> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
> backend) I reached a point when throughput is ~120-150k/s.
> One the same kafka and machine I reached > 500k/s with simple filtering
> job,
> so I wanted to see what's the bottleneck.
>
> It turns out that quite often all of kafka threads are stuck waiting for
> buffer from pool:
> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>     java.lang.Thread.State: TIMED_WAITING (on object monitor)
>          at java.lang.Object.wait(Native Method)
>          at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>
>          - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>          at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>
>          at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>
>          - locked <0x00000002eb73cbd0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>
>          at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>
>          at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>
>          at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>
>          at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
>          at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>
>          at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>
>          at scala.collection.immutable.List.foreach(List.scala:381)
>          at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>
>          at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>
>          at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>
>          at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>
>          at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>
>          at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>
>          at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>
>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>          at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>
>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>
> This seems a bit weird for me, as most of state processing threads are
> idle:
>
> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5
> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
> [0x00007f7bee8ed000]
>     java.lang.Thread.State: TIMED_WAITING (parking)
>          at sun.misc.Unsafe.park(Native Method)
>          - parking to wait for  <0x00000002eb840c38> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>          at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>          at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>
>          at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>
>          at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>
>          at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>          at java.lang.Thread.run(Thread.java:745)
>
>
> I tried with using more network buffers, but I doesn't seem to change
> anything - and if I understand correctly
>
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
> I should not need more than 24^2 * 4 of them...
>
> Does anybody encountered such problem? Or maybe it's just normal for such
> case...
>
> thanks,
> maciek
>
>
>
>

Re: Threads waiting on LocalBufferPool

Posted by Maciek Próchniak <mp...@touk.pl>.
Well...
I found some time to look at rocksDB performance.

It takes around 0.4ms to lookup value state and 0.12ms to update - these 
are means, 95th percentile was > 1ms for get... When I set additional 
options:
           .setIncreaseParallelism(8)
           .setMaxOpenFiles(-1)
           .setCompressionType(CompressionType.SNAPPY_COMPRESSION)

I manage to get
0.05ms for update and 0.2ms for get - but still it seems pretty bad 
compared to standard rocksdb java benchmarks that I try on the same 
machine, as they are:
fillseq          :     1.23238 micros/op;   89.8 MB/s; 1000000 ops 
done;  1 / 1 task(s) finished.
readrandom       :     9.25380 micros/op;   12.0 MB/s; 1000000 / 1000000 
found;  1 / 1 task(s) finished.
fillrandom       :     4.46839 micros/op;   24.8 MB/s; 1000000 ops 
done;  1 / 1 task(s) finished.

guess I'll have to look at it a bit more...

thanks anyway,
maciek


On 21/04/2016 08:41, Maciek Próchniak wrote:
> Hi Ufuk,
>
> thanks for quick reply.
> Actually I had a little time to try both things.
> 1) helped only temporarily - it just took a bit longer to saturate the 
> pool. After few minutes, periodically all kafka threads were waiting 
> for bufferPool.
> 2) This seemed to help. I also reduced checkpoint interval - on rocks 
> we had 5min, now I tried 30s. .
>
> I attach throughput metrics - the former (around 18) is with increased 
> heap & buffers, the latter (around 22) is with FileSystemStateBackend.
> My state is few GB large - during the test it reached around 2-3GB. I 
> must admit I was quite impressed that checkpointing to HDFS using 
> FileSystem took only about 6-7s (with occasional spikes to 12-13s, 
> which can be seen on metrcs - didn't check if it was caused by hdfs or 
> sth else).
>
> Now I looked at logs from 18 and seems like checkpointing rocksdb took 
> around 2-3minutes:
> 2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
> Triggering checkpoint 6 @ 1461167253439
> 2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] 
> INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> Completed checkpoint 6 (in 140588 ms)
> - however I don't see any threads dumping state in threadStacks...
>
> I guess I'll have to add some metrics around state invocations to see 
> where is the problem with rocksDB... I'll write if I find anything, 
> but that won't be today I think...
>
> Btw - I was looking at FS state and I wonder would it be feasible to 
> make variant of this state using immutable map (probably some scala 
> one) to be able to do async checkpoints.
> Then synchronous part would be essentially free - just taking the 
> state map and materializing it asynchronously.
> Of course, that would work only for immutable state - but this is 
> often the case when writing in scala. WDYT?
>
> thanks,
> maciek
>
>
>
>
> On 20/04/2016 16:28, Ufuk Celebi wrote:
>> Could be different things actually, including the parts of the network
>> you mentioned.
>>
>> 1)
>>
>> Regarding the TM config:
>> - It can help to increase the number of network buffers (you can go
>> ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
>> - In general, you have way more memory available than you actually
>> give to Flink. I would increase the 20 GB heap size.
>>
>> As a first step you could address these two points and re-run your job.
>>
>> 2)
>>
>> As a follow-up you could also work with the FileSystemStateBackend,
>> which keeps state in memory (on-heap) and writes checkpoints to files.
>> This would help in checking how much RocksDB is slowing things down.
>>
>>
>> I'm curious about the results. Do you think you will have time to try 
>> this?
>>
>> – Ufuk
>>
>>
>> On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mp...@touk.pl> wrote:
>>> Hi,
>>> I'm running my flink job on one rather large machine (20 cores with
>>> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
>>> It does more or less:
>>> read csv from kafka -> keyBy one of the fields -> some custom state
>>> processing.
>>> Kafka topic has 24 partitions, so my parallelism is also 24
>>>
>>> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
>>> backend) I reached a point when throughput is ~120-150k/s.
>>> One the same kafka and machine I reached > 500k/s with simple 
>>> filtering job,
>>> so I wanted to see what's the bottleneck.
>>>
>>> It turns out that quite often all of kafka threads are stuck waiting 
>>> for
>>> buffer from pool:
>>> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
>>> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>>>     java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>          at java.lang.Object.wait(Native Method)
>>>          at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) 
>>>
>>>          - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>>>          at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
>>>
>>>          at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) 
>>>
>>>          - locked <0x00000002eb73cbd0> (a
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) 
>>>
>>>          at scala.collection.immutable.List.foreach(List.scala:381)
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) 
>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) 
>>>
>>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>          at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) 
>>>
>>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>
>>> This seems a bit weird for me, as most of state processing threads 
>>> are idle:
>>>
>>> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon 
>>> prio=5
>>> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
>>> [0x00007f7bee8ed000]
>>>     java.lang.Thread.State: TIMED_WAITING (parking)
>>>          at sun.misc.Unsafe.park(Native Method)
>>>          - parking to wait for  <0x00000002eb840c38> (a
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>          at
>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>          at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) 
>>>
>>>          at
>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) 
>>>
>>>          at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) 
>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) 
>>>
>>>          at 
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>          at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I tried with using more network buffers, but I doesn't seem to change
>>> anything - and if I understand correctly
>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers 
>>>
>>> I should not need more than 24^2 * 4 of them...
>>>
>>> Does anybody encountered such problem? Or maybe it's just normal for 
>>> such
>>> case...
>>>
>>> thanks,
>>> maciek
>>>
>


Re: Threads waiting on LocalBufferPool

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting for 
bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks we 
had 5min, now I tried 30s. .

I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, which 
can be seen on metrcs - didn't check if it was caused by hdfs or sth else).

Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 6 (in 140588 ms)
- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, but 
that won't be today I think...

Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala one) 
to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the state 
map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is often 
the case when writing in scala. WDYT?

thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:
> Could be different things actually, including the parts of the network
> you mentioned.
>
> 1)
>
> Regarding the TM config:
> - It can help to increase the number of network buffers (you can go
> ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
> - In general, you have way more memory available than you actually
> give to Flink. I would increase the 20 GB heap size.
>
> As a first step you could address these two points and re-run your job.
>
> 2)
>
> As a follow-up you could also work with the FileSystemStateBackend,
> which keeps state in memory (on-heap) and writes checkpoints to files.
> This would help in checking how much RocksDB is slowing things down.
>
>
> I'm curious about the results. Do you think you will have time to try this?
>
> – Ufuk
>
>
> On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mp...@touk.pl> wrote:
>> Hi,
>> I'm running my flink job on one rather large machine (20 cores with
>> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
>> It does more or less:
>> read csv from kafka -> keyBy one of the fields -> some custom state
>> processing.
>> Kafka topic has 24 partitions, so my parallelism is also 24
>>
>> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
>> backend) I reached a point when throughput is ~120-150k/s.
>> One the same kafka and machine I reached > 500k/s with simple filtering job,
>> so I wanted to see what's the bottleneck.
>>
>> It turns out that quite often all of kafka threads are stuck waiting for
>> buffer from pool:
>> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
>> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>>     java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>          at java.lang.Object.wait(Native Method)
>>          at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>          - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>>          at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>          at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>          - locked <0x00000002eb73cbd0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>          at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>          at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>          at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>          at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>          at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>          at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>          at scala.collection.immutable.List.foreach(List.scala:381)
>>          at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>          at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>          at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>          at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>          at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>          at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>          at
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>          at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>
>> This seems a bit weird for me, as most of state processing threads are idle:
>>
>> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5
>> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
>> [0x00007f7bee8ed000]
>>     java.lang.Thread.State: TIMED_WAITING (parking)
>>          at sun.misc.Unsafe.park(Native Method)
>>          - parking to wait for  <0x00000002eb840c38> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>          at
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>          at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>          at
>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>>          at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>>          at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>          at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>          at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>          at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>          at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I tried with using more network buffers, but I doesn't seem to change
>> anything - and if I understand correctly
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>> I should not need more than 24^2 * 4 of them...
>>
>> Does anybody encountered such problem? Or maybe it's just normal for such
>> case...
>>
>> thanks,
>> maciek
>>


Re: Threads waiting on LocalBufferPool

Posted by Ufuk Celebi <uc...@apache.org>.
Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> Hi,
> I'm running my flink job on one rather large machine (20 cores with
> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
> It does more or less:
> read csv from kafka -> keyBy one of the fields -> some custom state
> processing.
> Kafka topic has 24 partitions, so my parallelism is also 24
>
> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
> backend) I reached a point when throughput is ~120-150k/s.
> One the same kafka and machine I reached > 500k/s with simple filtering job,
> so I wanted to see what's the bottleneck.
>
> It turns out that quite often all of kafka threads are stuck waiting for
> buffer from pool:
> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>         - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>         - locked <0x00000002eb73cbd0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>         at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>         at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>         - locked <0x00000002eaf3eb50> (a java.lang.Object)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>         - locked <0x00000002eaf3eb50> (a java.lang.Object)
>
> This seems a bit weird for me, as most of state processing threads are idle:
>
> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5
> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
> [0x00007f7bee8ed000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000002eb840c38> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>         at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>         at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> I tried with using more network buffers, but I doesn't seem to change
> anything - and if I understand correctly
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
> I should not need more than 24^2 * 4 of them...
>
> Does anybody encountered such problem? Or maybe it's just normal for such
> case...
>
> thanks,
> maciek
>