You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2017/10/04 23:33:06 UTC

Unusual log message - Emitter thread got interrupted

Hi all,

I’ve got a streaming topology with an iteration, and a RichAsyncFunction in that iteration.

When the iteration terminates due to no activity, I see this message in the logs:

17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This indicates that the emitter should shut down.
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
	at java.lang.Thread.run(Thread.java:748)

I read through https://issues.apache.org/jira/browse/FLINK-5638 <https://issues.apache.org/jira/browse/FLINK-5638>, which makes me wonder if there’s a different but related issue involving an async function in an iteration.

Or perhaps I need to do something in my RichAsyncFunction to avoid this situation?

Or is this expected and just the way things are currently?

Just FYI, my topology is here: https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf <https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf>

Thanks,

— Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Unusual log message - Emitter thread got interrupted

Posted by Aljoscha Krettek <al...@apache.org>.
Just FYI: I pushed a change that changes the message and removes the stack trace (and the exception).

> On 10. Oct 2017, at 00:58, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for responding.
> 
>> On Oct 9, 2017, at 7:36 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi,
>> 
>> In my understanding this is the expected behaviour of the code. The only way to shut down the Emitter is via an interrupt because it is otherwise blocking on the queue. If the Emitter had been interrupted while the operator is still running it would have gone down a different code path: https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89 <https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89>
>> 
>> Did you see any other faulty behaviour or only this log message.
> 
> That seemed to be the only oddity in the logs.
> 
> I’d suggest changing the logging call to not include the exception, as dumping out the stack trace in the log implies there’s a problem somewhere.
> 
> And changing the message to something like "Emitter thread got interrupted, shutting it down” would make it clearer it’s not an unexpected situation.
> 
> Thanks,
> 
> — Ken
> 
> 
> 
>>> On 6. Oct 2017, at 18:17, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> I don't have much experience with streaming iterations.
>>> Maybe Aljoscha (in CC) has an idea what is happening and if it can be prevented.
>>> 
>>> Best, Fabian
>>> 
>>> 2017-10-05 1:33 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
>>> Hi all,
>>> 
>>> I’ve got a streaming topology with an iteration, and a RichAsyncFunction in that iteration.
>>> 
>>> When the iteration terminates due to no activity, I see this message in the logs:
>>> 
>>> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This indicates that the emitter should shut down.
>>> java.lang.InterruptedException
>>> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>>> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>>> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
>>> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> 
>>> I read through https://issues.apache.org/jira/browse/FLINK-5638 <https://issues.apache.org/jira/browse/FLINK-5638>, which makes me wonder if there’s a different but related issue involving an async function in an iteration.
>>> 
>>> Or perhaps I need to do something in my RichAsyncFunction to avoid this situation?
>>> 
>>> Or is this expected and just the way things are currently?
>>> 
>>> Just FYI, my topology is here: https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf <https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf>
>>> 
>>> Thanks,
>>> 
>>> — Ken
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 


Re: Unusual log message - Emitter thread got interrupted

Posted by Ken Krugler <kk...@transpac.com>.
Hi Aljoscha,

Thanks for responding.

> On Oct 9, 2017, at 7:36 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> 
> Hi,
> 
> In my understanding this is the expected behaviour of the code. The only way to shut down the Emitter is via an interrupt because it is otherwise blocking on the queue. If the Emitter had been interrupted while the operator is still running it would have gone down a different code path: https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89 <https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89>
> 
> Did you see any other faulty behaviour or only this log message.

That seemed to be the only oddity in the logs.

I’d suggest changing the logging call to not include the exception, as dumping out the stack trace in the log implies there’s a problem somewhere.

And changing the message to something like "Emitter thread got interrupted, shutting it down” would make it clearer it’s not an unexpected situation.

Thanks,

— Ken



>> On 6. Oct 2017, at 18:17, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Ken,
>> 
>> I don't have much experience with streaming iterations.
>> Maybe Aljoscha (in CC) has an idea what is happening and if it can be prevented.
>> 
>> Best, Fabian
>> 
>> 2017-10-05 1:33 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
>> Hi all,
>> 
>> I’ve got a streaming topology with an iteration, and a RichAsyncFunction in that iteration.
>> 
>> When the iteration terminates due to no activity, I see this message in the logs:
>> 
>> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This indicates that the emitter should shut down.
>> java.lang.InterruptedException
>> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
>> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
>> 	at java.lang.Thread.run(Thread.java:748)
>> 
>> I read through https://issues.apache.org/jira/browse/FLINK-5638 <https://issues.apache.org/jira/browse/FLINK-5638>, which makes me wonder if there’s a different but related issue involving an async function in an iteration.
>> 
>> Or perhaps I need to do something in my RichAsyncFunction to avoid this situation?
>> 
>> Or is this expected and just the way things are currently?
>> 
>> Just FYI, my topology is here: https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf <https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf>
>> 
>> Thanks,
>> 
>> — Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Unusual log message - Emitter thread got interrupted

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

In my understanding this is the expected behaviour of the code. The only way to shut down the Emitter is via an interrupt because it is otherwise blocking on the queue. If the Emitter had been interrupted while the operator is still running it would have gone down a different code path: https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89

Did you see any other faulty behaviour or only this log message.

Best,
Aljoscha

> On 6. Oct 2017, at 18:17, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ken,
> 
> I don't have much experience with streaming iterations.
> Maybe Aljoscha (in CC) has an idea what is happening and if it can be prevented.
> 
> Best, Fabian
> 
> 2017-10-05 1:33 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
> Hi all,
> 
> I’ve got a streaming topology with an iteration, and a RichAsyncFunction in that iteration.
> 
> When the iteration terminates due to no activity, I see this message in the logs:
> 
> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This indicates that the emitter should shut down.
> java.lang.InterruptedException
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> I read through https://issues.apache.org/jira/browse/FLINK-5638 <https://issues.apache.org/jira/browse/FLINK-5638>, which makes me wonder if there’s a different but related issue involving an async function in an iteration.
> 
> Or perhaps I need to do something in my RichAsyncFunction to avoid this situation?
> 
> Or is this expected and just the way things are currently?
> 
> Just FYI, my topology is here: https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf <https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf>
> 
> Thanks,
> 
> — Ken
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378 <tel:(530)%20210-6378>
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 


Re: Unusual log message - Emitter thread got interrupted

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

I don't have much experience with streaming iterations.
Maybe Aljoscha (in CC) has an idea what is happening and if it can be
prevented.

Best, Fabian

2017-10-05 1:33 GMT+02:00 Ken Krugler <kk...@transpac.com>:

> Hi all,
>
> I’ve got a streaming topology with an iteration, and a RichAsyncFunction
> in that iteration.
>
> When the iteration terminates due to no activity, I see this message in
> the logs:
>
> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted.
> This indicates that the emitter should shut down.
> java.lang.InterruptedException
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
> at org.apache.flink.streaming.api.operators.async.queue.
> UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.
> java:147)
> at org.apache.flink.streaming.api.operators.async.Emitter.
> run(Emitter.java:82)
> at java.lang.Thread.run(Thread.java:748)
>
> I read through https://issues.apache.org/jira/browse/FLINK-5638, which
> makes me wonder if there’s a different but related issue involving an async
> function in an iteration.
>
> Or perhaps I need to do something in my RichAsyncFunction to avoid this
> situation?
>
> Or is this expected and just the way things are currently?
>
> Just FYI, my topology is here: https://s3.amazonaws.
> com/su-public/flink-crawler+topology.pdf
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> +1 530-210-6378 <(530)%20210-6378>
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>