You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2018/03/21 22:29:10 UTC

InterruptedException when async function is cancelled

Hi all,

When I cancel a job that has async functions, I see this sequence in the TaskManager logs:

2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING.
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).

And then less than a second later...

2018-03-21 14:51:35,315 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut down timer service
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
	at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:747)

Followed shortly thereafter by a call to the async function’s close() method, which logs:

2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor                  - Shutting down the AsyncFunctionName thread pool 

And finally…

2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task                     - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED.
2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).

I’ve looked through the code, and I don’t see any place where I’m interrupting any threads. When I shut down my own thread pool, interrupts will be generated, but only for threads used by my pool, and this happens after the InterruptedException.

Is this a known issue? Or is there something I can to do to avoid it?

Thanks,

— Ken 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: InterruptedException when async function is cancelled

Posted by Timo Walther <tw...@apache.org>.
Hi Anil,

if I researched correctly we are talking about these changes [1]. I 
don't know if you can back port it, but I hope this helps.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9304


Am 07.11.18 um 17:41 schrieb Anil:
> Hi Till,
>          Thanks for the reply. Is there any particular patch I can use as
> upgrading to Flink 1.6 is not an option for me at the moment.
> Regards,
> Anil.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: InterruptedException when async function is cancelled

Posted by Anil <an...@gmail.com>.
Hi Till,
        Thanks for the reply. Is there any particular patch I can use as
upgrading to Flink 1.6 is not an option for me at the moment.
Regards,
Anil.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: InterruptedException when async function is cancelled

Posted by Till Rohrmann <tr...@apache.org>.
Hi Anil,

as Stephan stated, the fix is not included in Flink 1.4.2 but in the later
version of Flink. Can you upgrade to Flink 1.5.5 or Flink 1.6.2 to check
whether the problem still occurs?

Cheers,
Till

On Sun, Oct 28, 2018 at 8:55 AM Anil <an...@gmail.com> wrote:

> I do see the same error but in case different situation. I'm not cancelling
> the job. Below is my error stack trace. SwiglobeZoneFromLatLong is my UDF
> name. Is this error something ignorable. I'm using flink 1.4.2.
> Thanks in advance.
>
> ```
> {"debug_level":"ERROR","debug_timestamp":"2018-10-28
> 06:40:20,838","debug_thread":"Source: Custom Source -> from: (event,
> proctime) -> select: (proctime, CityFromLatLong1(event.latLong.lat,
> event.latLong.lng) AS $f1, SwiglobeZoneFromLatLong(event.latLong.lat,
> event.latLong.lng) AS $f2, event.listingDataEventStats.serviceableRestCount
> AS $f3) (1/8)","debug_file":"StreamTask.java",
> "debug_line":"326","debug_message":"Could not shut down timer service",
> "job_name": "7bb4d4a4-f85a-429e-92e8-0c887f9b8cbd" }
> java.lang.InterruptedException
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
>         at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> ```
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: InterruptedException when async function is cancelled

Posted by Anil <an...@gmail.com>.
I do see the same error but in case different situation. I'm not cancelling
the job. Below is my error stack trace. SwiglobeZoneFromLatLong is my UDF
name. Is this error something ignorable. I'm using flink 1.4.2.  
Thanks in advance. 

```
{"debug_level":"ERROR","debug_timestamp":"2018-10-28
06:40:20,838","debug_thread":"Source: Custom Source -> from: (event,
proctime) -> select: (proctime, CityFromLatLong1(event.latLong.lat,
event.latLong.lng) AS $f1, SwiglobeZoneFromLatLong(event.latLong.lat,
event.latLong.lng) AS $f2, event.listingDataEventStats.serviceableRestCount
AS $f3) (1/8)","debug_file":"StreamTask.java",
"debug_line":"326","debug_message":"Could not shut down timer service",
"job_name": "7bb4d4a4-f85a-429e-92e8-0c887f9b8cbd" }
java.lang.InterruptedException
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
	at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
	at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
```



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: InterruptedException when async function is cancelled

Posted by Stephan Ewen <se...@apache.org>.
Agreed.

It is fixed in 1.5 and in the 1.4.x branch. The fix came after 1.4.2, so it
s not released as of now.

On Tue, Apr 17, 2018 at 7:47 PM, Ken Krugler <kk...@transpac.com>
wrote:

> Hi Timo,
>
> [Resending from an address the Apache list server likes…]
>
> I discussed this with Till during Flink Forward, and he said it looks like
> the expected result when cancelling, as that will cause all operators to be
> interrupted, which in turn generates the stack trace I’m seeing.
>
> As to whether it’s a bug or not - I guess not.
>
> But it would be nice if something like this (expected action) wasn’t being
> logged as an error.
>
> Regards,
>
> — Ken
>
>
> On Mar 26, 2018, at 3:19 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Ken,
>
> as you can see here [1], Flink interrupts the timer service after a
> certain timeout. If you want to get rid of the exception, you should
> increase "task.cancellation.timers.timeout" in the configuration.
>
> Actually, the default is already set to 7 seconds. So your exception
> should not be thrown so quickly. For me this looks like a bug but please
> let us know if setting the timeout higher solved your problem.
>
> Regards,
> Timo
>
>
> [1] https://github.com/apache/flink/blob/master/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/
> runtime/tasks/StreamTask.java#L358
>
>
> Am 21.03.18 um 23:29 schrieb Ken Krugler:
>
> Hi all,
>
> When I cancel a job that has async functions, I see this sequence in the
> TaskManager logs:
>
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Attempting to cancel task AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9).
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING.
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Triggering cancellation of task
> code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> And then less than a second later...
>
> 2018-03-21 14:51:35,315 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
>           - Could not shut down timer service
> java.lang.InterruptedException
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
> at java.util.concurrent.ThreadPoolExecutor.awaitTermination(
> ThreadPoolExecutor.java:1465)
> at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.
> shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:317)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:747)
>
> Followed shortly thereafter by a call to the async function’s close()
> method, which logs:
>
> 2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor
>                - Shutting down the AsyncFunctionName thread pool
>
> And finally…
>
> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED.
> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Freeing task resources for AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> I’ve looked through the code, and I don’t see any place where I’m
> interrupting any threads. When I shut down my own thread pool, interrupts
> will be generated, but only for threads used by my pool, and this happens
> after the InterruptedException.
>
> Is this a known issue? Or is there something I can to do to avoid it?
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>

Re: InterruptedException when async function is cancelled

Posted by Ken Krugler <kk...@transpac.com>.
Hi Timo,

[Resending from an address the Apache list server likes…]

I discussed this with Till during Flink Forward, and he said it looks like the expected result when cancelling, as that will cause all operators to be interrupted, which in turn generates the stack trace I’m seeing.

As to whether it’s a bug or not - I guess not.

But it would be nice if something like this (expected action) wasn’t being logged as an error.

Regards,

— Ken


> On Mar 26, 2018, at 3:19 AM, Timo Walther <twalthr@apache.org <ma...@apache.org>> wrote:
> 
> Hi Ken,
> 
> as you can see here [1], Flink interrupts the timer service after a certain timeout. If you want to get rid of the exception, you should increase "task.cancellation.timers.timeout" in the configuration.
> 
> Actually, the default is already set to 7 seconds. So your exception should not be thrown so quickly. For me this looks like a bug but please let us know if setting the timeout higher solved your problem.
> 
> Regards,
> Timo
> 
> 
> [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358 <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358>
> 
> 
> Am 21.03.18 um 23:29 schrieb Ken Krugler:
>> Hi all,
>> 
>> When I cancel a job that has async functions, I see this sequence in the TaskManager logs:
>> 
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING.
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 
>> And then less than a second later...
>> 
>> 2018-03-21 14:51:35,315 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut down timer service
>> java.lang.InterruptedException
>> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
>> 	at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
>> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> 	at java.lang.Thread.run(Thread.java:747)
>> 
>> Followed shortly thereafter by a call to the async function’s close() method, which logs:
>> 
>> 2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor                  - Shutting down the AsyncFunctionName thread pool 
>> 
>> And finally…
>> 
>> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task                     - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED.
>> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 
>> I’ve looked through the code, and I don’t see any place where I’m interrupting any threads. When I shut down my own thread pool, interrupts will be generated, but only for threads used by my pool, and this happens after the InterruptedException.
>> 
>> Is this a known issue? Or is there something I can to do to avoid it?
>> 
>> Thanks,
>> 
>> — Ken 
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>> 
> 

--------------------------------------------
http://about.me/kkrugler <http://about.me/kkrugler>
+1 530-210-6378



--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378


Re: InterruptedException when async function is cancelled

Posted by Timo Walther <tw...@apache.org>.
Hi Ken,

as you can see here [1], Flink interrupts the timer service after a 
certain timeout. If you want to get rid of the exception, you should 
increase "task.cancellation.timers.timeout" in the configuration.

Actually, the default is already set to 7 seconds. So your exception 
should not be thrown so quickly. For me this looks like a bug but please 
let us know if setting the timeout higher solved your problem.

Regards,
Timo


[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358


Am 21.03.18 um 23:29 schrieb Ken Krugler:
> Hi all,
>
> When I cancel a job that has async functions, I see this sequence in 
> the TaskManager logs:
>
> 2018-03-21 14:51:34,471 INFO 
>  org.apache.flink.runtime.taskmanager.Task                     - 
> Attempting to cancel task AsyncFunctionName (1/1) 
> (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
> 2018-03-21 14:51:34,471 INFO 
>  org.apache.flink.runtime.taskmanager.Task 
> - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched 
> from RUNNING to CANCELING.
> 2018-03-21 14:51:34,471 INFO 
>  org.apache.flink.runtime.taskmanager.Task                     - 
> Triggering cancellation of task code AsyncFunctionName (1/1) 
> (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> And then less than a second later...
>
> 2018-03-21 14:51:35,315 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service
> java.lang.InterruptedException
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
> at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
> at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:747)
>
> Followed shortly thereafter by a call to the async function’s close() 
> method, which logs:
>
> 2018-03-21 14:51:35,334 DEBUG 
> com.scaleunlimited.utils.ThreadedExecutor                  - Shutting 
> down the AsyncFunctionName thread pool
>
> And finally…
>
> 2018-03-21 14:51:35,340 INFO 
>  org.apache.flink.runtime.taskmanager.Task 
> - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched 
> from CANCELING to CANCELED.
> 2018-03-21 14:51:35,340 INFO 
>  org.apache.flink.runtime.taskmanager.Task                     - 
> Freeing task resources for AsyncFunctionName (1/1) 
> (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> I’ve looked through the code, and I don’t see any place where I’m 
> interrupting any threads. When I shut down my own thread pool, 
> interrupts will be generated, but only for threads used by my pool, 
> and this happens after the InterruptedException.
>
> Is this a known issue? Or is there something I can to do to avoid it?
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>