You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maciek Próchniak <mp...@touk.pl> on 2016/03/09 13:59:31 UTC
streaming job reading from kafka stuck while cancelling
Hi,
from time to time when we cancel streaming jobs (or they are failing for
some reason) we encounter:
2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
(...) ' did not react to cancelling signal, but is stuck in method:
java.lang.Object.wait(Native Method)
java.lang.Thread.join(Thread.java:1253)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)
Now, relevant stacktrace is this:
"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in
Object.wait() [0x00007f2bac847000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- locked <0x000000041ae00180> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x00000004be0002f0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
- locked <0x000000041ae001c8> (a java.lang.Object)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
- locked <0x000000041ae001c8> (a java.lang.Object)
and also:
"OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
- waiting to lock <0x00000004be0002f0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
- looks like source tries to process remaining kafka messages, but it is
stuck on partitioning by key.
I managed to take heap dump and look what's going on inside LocalBufferPool
- availableMemorySegments queue is empty
- numberOfRequestedMemorySegments == currentPoolSize == 16
- there are not registeredListeners
Now, it seems that loop in LocalBufferPool#142 without end, waiting for
buffer recycle - but from what I see it won't happen because OutputFlusher
is blocked by this loop.
The problem occurs (it seems) when more or less at the same time as job
cancellation we start new job (e.g. taskmanager is restarted, one job is
failing because of some problem,
and another one is just starting) - so I wonder could it be some problem
with setNumBuffers method - although it looks synchronized enough...
We are using version 1.0.0 (RC4) btw
I hope to dig further into this - but for now this is all I managed to find.
thanks,
maciek
Re: streaming job reading from kafka stuck while cancelling
Posted by Stephan Ewen <se...@apache.org>.
The reason that the consumer thread is not interrupted (which is the reason
why there is a separate consumer thread in the first place) is that Kafka
has a bug (or design issue) where thread interrupting may lead to a
deadlock in the thread.
Interrupting the thread would need to make sure that interruption never
happens while the thread is in the Kafka function stack, only while it is
in Flink code.
On Wed, Mar 9, 2016 at 4:25 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> Thanks,
>
> that makes sense...
> Guess I'll try some dirty workaround for now by interrupting consumer
> thread if it's doesn't finish after some time...
>
> maciek
>
>
> On 09/03/2016 14:42, Stephan Ewen wrote:
>
> Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595
>
> On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Thanks for the debugging this, I think there is in fact an issue in the
>> 0.9 consumer.
>>
>> I'll open a ticket for it, will try to fix that as soon as possible...
>>
>> Stephan
>>
>>
>> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak < <mp...@touk.pl>
>> mpr@touk.pl> wrote:
>>
>>> Hi,
>>>
>>> from time to time when we cancel streaming jobs (or they are failing for
>>> some reason) we encounter:
>>>
>>> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
>>> (...) ' did not react to cancelling signal, but is stuck in method:
>>> java.lang.Object.wait(Native Method)
>>> java.lang.Thread.join(Thread.java:1253)
>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Now, relevant stacktrace is this:
>>>
>>> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in
>>> Object.wait() [0x00007f2bac847000]
>>> java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>> at java.lang.Object.wait(Native Method)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>> - locked <0x000000041ae00180> (a java.util.ArrayDeque)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>> - locked <0x00000004be0002f0> (a
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> - locked <0x000000041ae001c8> (a java.lang.Object)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>> - locked <0x000000041ae001c8> (a java.lang.Object)
>>>
>>> and also:
>>> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
>>> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
>>> - waiting to lock <0x00000004be0002f0> (a
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>>>
>>>
>>> - looks like source tries to process remaining kafka messages, but it is
>>> stuck on partitioning by key.
>>>
>>> I managed to take heap dump and look what's going on inside
>>> LocalBufferPool
>>> - availableMemorySegments queue is empty
>>> - numberOfRequestedMemorySegments == currentPoolSize == 16
>>> - there are not registeredListeners
>>>
>>> Now, it seems that loop in LocalBufferPool#142 without end, waiting for
>>> buffer recycle - but from what I see it won't happen because OutputFlusher
>>> is blocked by this loop.
>>>
>>> The problem occurs (it seems) when more or less at the same time as job
>>> cancellation we start new job (e.g. taskmanager is restarted, one job is
>>> failing because of some problem,
>>> and another one is just starting) - so I wonder could it be some problem
>>> with setNumBuffers method - although it looks synchronized enough...
>>>
>>> We are using version 1.0.0 (RC4) btw
>>>
>>> I hope to dig further into this - but for now this is all I managed to
>>> find.
>>>
>>> thanks,
>>> maciek
>>>
>>
>>
>
>
Re: streaming job reading from kafka stuck while cancelling
Posted by Ufuk Celebi <uc...@apache.org>.
Hey Maciek! I'm working on the other proposed fix by closing the
buffer pool early. I expect the fix to make it into the next bugfix
release 1.0.1 (or 1.0.2 if 1.0.1 comes very soon).
– Ufuk
Re: streaming job reading from kafka stuck while cancelling
Posted by Maciek Próchniak <mp...@touk.pl>.
Thanks,
that makes sense...
Guess I'll try some dirty workaround for now by interrupting consumer
thread if it's doesn't finish after some time...
maciek
On 09/03/2016 14:42, Stephan Ewen wrote:
> Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595
>
> On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <sewen@apache.org
> <ma...@apache.org>> wrote:
>
> Hi!
>
> Thanks for the debugging this, I think there is in fact an issue
> in the 0.9 consumer.
>
> I'll open a ticket for it, will try to fix that as soon as possible...
>
> Stephan
>
>
> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <mpr@touk.pl
> <ma...@touk.pl>> wrote:
>
> Hi,
>
> from time to time when we cancel streaming jobs (or they are
> failing for some reason) we encounter:
>
> 2016-03-09 10:25:29,799 [Canceler for Source: read objects
> from topic: (...) ' did not react to cancelling signal, but is
> stuck in method:
> java.lang.Object.wait(Native Method)
> java.lang.Thread.join(Thread.java:1253)
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:745)
>
>
> Now, relevant stacktrace is this:
>
> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...
> nid=0x2e96 in Object.wait() [0x00007f2bac847000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> - locked <0x000000041ae00180> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
> - locked <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
> - locked <0x000000041ae001c8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> - locked <0x000000041ae001c8> (a java.lang.Object)
>
> and also:
> "OutputFlusher" #1244231 daemon prio=5 os_prio=0
> tid=0x00007f2a39d4e800 nid=0x2e7d waiting for monitor entry
> [0x00007f2a3e5e4000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
> - waiting to lock <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>
>
> - looks like source tries to process remaining kafka messages,
> but it is stuck on partitioning by key.
>
> I managed to take heap dump and look what's going on inside
> LocalBufferPool
> - availableMemorySegments queue is empty
> - numberOfRequestedMemorySegments == currentPoolSize == 16
> - there are not registeredListeners
>
> Now, it seems that loop in LocalBufferPool#142 without end,
> waiting for buffer recycle - but from what I see it won't
> happen because OutputFlusher
> is blocked by this loop.
>
> The problem occurs (it seems) when more or less at the same
> time as job cancellation we start new job (e.g. taskmanager is
> restarted, one job is failing because of some problem,
> and another one is just starting) - so I wonder could it be
> some problem with setNumBuffers method - although it looks
> synchronized enough...
>
> We are using version 1.0.0 (RC4) btw
>
> I hope to dig further into this - but for now this is all I
> managed to find.
>
> thanks,
> maciek
>
>
>
Re: streaming job reading from kafka stuck while cancelling
Posted by Stephan Ewen <se...@apache.org>.
Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595
On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
>
> Thanks for the debugging this, I think there is in fact an issue in the
> 0.9 consumer.
>
> I'll open a ticket for it, will try to fix that as soon as possible...
>
> Stephan
>
>
> On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <mp...@touk.pl> wrote:
>
>> Hi,
>>
>> from time to time when we cancel streaming jobs (or they are failing for
>> some reason) we encounter:
>>
>> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
>> (...) ' did not react to cancelling signal, but is stuck in method:
>> java.lang.Object.wait(Native Method)
>> java.lang.Thread.join(Thread.java:1253)
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>> Now, relevant stacktrace is this:
>>
>> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in
>> Object.wait() [0x00007f2bac847000]
>> java.lang.Thread.State: TIMED_WAITING (on object monitor)
>> at java.lang.Object.wait(Native Method)
>> at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>> - locked <0x000000041ae00180> (a java.util.ArrayDeque)
>> at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>> at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>> - locked <0x00000004be0002f0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>> at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>> - locked <0x000000041ae001c8> (a java.lang.Object)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>> - locked <0x000000041ae001c8> (a java.lang.Object)
>>
>> and also:
>> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
>> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
>> java.lang.Thread.State: BLOCKED (on object monitor)
>> at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
>> - waiting to lock <0x00000004be0002f0> (a
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>> at
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>>
>>
>> - looks like source tries to process remaining kafka messages, but it is
>> stuck on partitioning by key.
>>
>> I managed to take heap dump and look what's going on inside
>> LocalBufferPool
>> - availableMemorySegments queue is empty
>> - numberOfRequestedMemorySegments == currentPoolSize == 16
>> - there are not registeredListeners
>>
>> Now, it seems that loop in LocalBufferPool#142 without end, waiting for
>> buffer recycle - but from what I see it won't happen because OutputFlusher
>> is blocked by this loop.
>>
>> The problem occurs (it seems) when more or less at the same time as job
>> cancellation we start new job (e.g. taskmanager is restarted, one job is
>> failing because of some problem,
>> and another one is just starting) - so I wonder could it be some problem
>> with setNumBuffers method - although it looks synchronized enough...
>>
>> We are using version 1.0.0 (RC4) btw
>>
>> I hope to dig further into this - but for now this is all I managed to
>> find.
>>
>> thanks,
>> maciek
>>
>
>
Re: streaming job reading from kafka stuck while cancelling
Posted by Stephan Ewen <se...@apache.org>.
Hi!
Thanks for the debugging this, I think there is in fact an issue in the 0.9
consumer.
I'll open a ticket for it, will try to fix that as soon as possible...
Stephan
On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> Hi,
>
> from time to time when we cancel streaming jobs (or they are failing for
> some reason) we encounter:
>
> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic:
> (...) ' did not react to cancelling signal, but is stuck in method:
> java.lang.Object.wait(Native Method)
> java.lang.Thread.join(Thread.java:1253)
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:745)
>
>
> Now, relevant stacktrace is this:
>
> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in
> Object.wait() [0x00007f2bac847000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> - locked <0x000000041ae00180> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
> - locked <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
> - locked <0x000000041ae001c8> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> - locked <0x000000041ae001c8> (a java.lang.Object)
>
> and also:
> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800
> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
> - waiting to lock <0x00000004be0002f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)
>
>
> - looks like source tries to process remaining kafka messages, but it is
> stuck on partitioning by key.
>
> I managed to take heap dump and look what's going on inside LocalBufferPool
> - availableMemorySegments queue is empty
> - numberOfRequestedMemorySegments == currentPoolSize == 16
> - there are not registeredListeners
>
> Now, it seems that loop in LocalBufferPool#142 without end, waiting for
> buffer recycle - but from what I see it won't happen because OutputFlusher
> is blocked by this loop.
>
> The problem occurs (it seems) when more or less at the same time as job
> cancellation we start new job (e.g. taskmanager is restarted, one job is
> failing because of some problem,
> and another one is just starting) - so I wonder could it be some problem
> with setNumBuffers method - although it looks synchronized enough...
>
> We are using version 1.0.0 (RC4) btw
>
> I hope to dig further into this - but for now this is all I managed to
> find.
>
> thanks,
> maciek
>