You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Le Xu <sh...@gmail.com> on 2021/02/27 02:28:30 UTC

[Statefun] Exception occurs during function chaining / Async function

Hello!

I'm getting an exception running a modified version of datastream/statefun
example. (See exception details that follow.) The example was adapted from
the original datastream example provided in statefun repo. I was trying to
play with the example by chaining two functions (with the 1st function
simulating async IO function invocation). However I'm getting an exception
saying that the mailbox is closed before the operation finishes. I'm adding
the source code of the example here
<https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134>
(Please ignore the printing statement and variable usage not making any
sense since I'm debugging). The example works without chaining the second
function or without using AsyncIO in any of the functions. Any ideas why
this happens? Any suggestions would be appreciated.


Thanks!

Le

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 885ad02322104a6e36b011fadc704d2a)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 885ad02322104a6e36b011fadc704d2a)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
at
org.apache.flink.statefun.examples.datastream.Example.main(Example.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 885ad02322104a6e36b011fadc704d2a)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.RejectedExecutionException:
java.lang.IllegalStateException: Mailbox is in state CLOSED, but is
required to be in state OPEN for put operations.
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:71)
at
org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:94)
at
org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but
is required to be in state OPEN for put operations.
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:265)
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:193)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:69)
... 16 more

Re: [Statefun] Exception occurs during function chaining / Async function

Posted by Seth Wiesman <sj...@gmail.com>.
Hi Le,

I believe the issue is the bounded source[1]. Stateful Functions only
supports unbounded inputs.

Additionally, you can remove all the `synchronized` blocks from your code;
statefun handles all synchronization for you.

Seth

[1]
https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134#file-example-java-L249-L251

On Fri, Feb 26, 2021 at 8:28 PM Le Xu <sh...@gmail.com> wrote:

> Hello!
>
> I'm getting an exception running a modified version of datastream/statefun
> example. (See exception details that follow.) The example was adapted from
> the original datastream example provided in statefun repo. I was trying to
> play with the example by chaining two functions (with the 1st function
> simulating async IO function invocation). However I'm getting an exception
> saying that the mailbox is closed before the operation finishes. I'm adding
> the source code of the example here
> <https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134>
> (Please ignore the printing statement and variable usage not making any
> sense since I'm debugging). The example works without chaining the second
> function or without using AsyncIO in any of the functions. Any ideas why
> this happens? Any suggestions would be appreciated.
>
>
> Thanks!
>
> Le
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
> at
> org.apache.flink.statefun.examples.datastream.Example.main(Example.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
> ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
> at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.RejectedExecutionException:
> java.lang.IllegalStateException: Mailbox is in state CLOSED, but is
> required to be in state OPEN for put operations.
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:71)
> at
> org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:94)
> at
> org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35)
> at
> org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114)
> at
> org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64)
> at
> org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED,
> but is required to be in state OPEN for put operations.
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:265)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:193)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:69)
> ... 16 more
>