You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/11/24 16:53:29 UTC

Exception: This method must be called from inside the mailbox thread

Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot
reproduce it. However I think something is lurking there...

I cannot post full stack trace and user code however I will try to describe
the problem.

Setup without any resource groups with only one Operator chain restriction
mentioned below.

chained task #1 - AsyncOperator with orderedWait calling 3rd party system
forwards to
chained task #2 - with: 
a) ProcessFunction A calling multi threaded library. in Process Function we
do
CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
b) Process Function B (no multi thread operations)
c) AsyncOperator with ordered wait calling 3rd party system
d) process Function

Between task #1 and #2 there is a .startNewChain() so separate those two
tasks. 

During load tests we got:
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!

The question is, what it actually means and when it may happen?

The "full" stack trace, from where I had to remove user code:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	user---Code---calls
	at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	user---Code---calls
	at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	user---Code---calls
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source)
[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
[?:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!
	at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
	... 35 more








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Exception: This method must be called from inside the mailbox thread

Posted by Arvid Heise <ar...@ververica.com>.
Hi KristoffSC,

I'd strongly suggest not blocking the task thread if it involves external
services. RPC notification cannot be processed and checkpoints are delayed
when the task thread is blocked. That's what AsyncIO is for.

If your third party library just takes a few ms to finish computation
without any external service, that would be totally fine though and would
be the normal case of compute-intensive tasks where few events come in and
still saturate a cluster quickly (e.g., text mining, data cleansing, or
even evaluation of a complex model).

On Tue, Nov 24, 2020 at 10:39 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi  Arvid,
> Thank you for your answer.
>
> And what if a) would block task's thread?
> Let's say I'm ok with making entire task thread to wait on this third party
> lib.
>
> In that case I would be safe from having this exception even though I would
> not use AsyncIO?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Exception: This method must be called from inside the mailbox thread

Posted by KristoffSC <kr...@gmail.com>.
Hi  Arvid,
Thank you for your answer. 

And what if a) would block task's thread?
Let's say I'm ok with making entire task thread to wait on this third party
lib. 

In that case I would be safe from having this exception even though I would
not use AsyncIO?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Exception: This method must be called from inside the mailbox thread

Posted by Arvid Heise <ar...@ververica.com>.
Hi KristoffSC,

sorry for the confusing error message. In short, mailbox thread = task
thread.

your operator a) calls collector.collect from a different thread (in which
the CompleteableFuture is completed). However, all APIs must always be used
from the task thread.

The only way to cross thread boundaries is AsyncIO, so a) must also be an
asyncIO.

On Tue, Nov 24, 2020 at 5:53 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi,
> I faced an issue on Flink 1.11. It was for now one time thing and I cannot
> reproduce it. However I think something is lurking there...
>
> I cannot post full stack trace and user code however I will try to describe
> the problem.
>
> Setup without any resource groups with only one Operator chain restriction
> mentioned below.
>
> chained task #1 - AsyncOperator with orderedWait calling 3rd party system
> forwards to
> chained task #2 - with:
> a) ProcessFunction A calling multi threaded library. in Process Function we
> do
>
> CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
> b) Process Function B (no multi thread operations)
> c) AsyncOperator with ordered wait calling 3rd party system
> d) process Function
>
> Between task #1 and #2 there is a .startNewChain() so separate those two
> tasks.
>
> During load tests we got:
> Caused by: java.lang.IllegalStateException: Illegal thread detected. This
> method must be called from inside the mailbox thread!
>
> The question is, what it actually means and when it may happen?
>
> The "full" stack trace, from where I had to remove user code:
>
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
>
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source)
> [?:?]
>         at java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> [?:?]
>         at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
> [?:?]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
>         at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.IllegalStateException: Illegal thread detected. This
> method must be called from inside the mailbox thread!
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         ... 35 more
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng