You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Amit Jain <aj...@gmail.com> on 2018/04/29 16:29:10 UTC

Batch job stuck in Canceled state in Flink 1.5

Hi,

We are running numbers of batch jobs in Flink 1.5 cluster and few of those
are getting stuck at random. These jobs having the following failure after
which operator status changes to CANCELED and stuck to same.

Please find complete TM's log at
https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012


2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
                 - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
at
org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thanks
Amit

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear :-)

On Tue, May 29, 2018 at 4:56 PM, Amit Jain <aj...@gmail.com> wrote:

> Thanks Till. `taskmanager.network.request-backoff.max` option helped in
> my case.  We tried this on 1.5.0 and jobs are running fine.
>
>
> --
> Thanks
> Amit
>
> On Thu 24 May, 2018, 4:58 PM Amit Jain, <aj...@gmail.com> wrote:
>
>> Thanks! Till. I'll give a try on your suggestions and update the thread.
>>
>> On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>> > Hi Amit,
>> >
>> > it looks as if the current cancellation cause is not the same as the
>> > initially reported cancellation cause. In the current case, it looks as
>> if
>> > the deployment of your tasks takes so long that that maximum
>> > `taskmanager.network.request-backoff.max` value has been reached. When
>> this
>> > happens a task gives up asking for the input result partition and fails
>> with
>> > the `PartitionNotFoundException`.
>> >
>> > More concretely, the `CHAIN Reduce (GroupReduce at
>> > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
>> > retrieve the result partition of the `CHAIN DataSource (at
>> > createInput(ExecutionEnvironment.java:548)
>> > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
>> > org.apache.flink.api.java.io.TextInputFormat
>> > [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/
>> redshift_logs/2018/5/20/14/,
>> > s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/
>> redshift_logs/2018/5/20/15/0/])
>> > -> Map (Key Extractor) -> Combine (GroupReduce at
>> > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the
>> state
>> > deploying when the exception occurs. It seems to me that this task takes
>> > quite some time to be deployed.
>> >
>> > One reason why the deployment could take some time is that an UDF (for
>> > example the closure) of one of the operators is quite large. If this is
>> the
>> > case, then Flink offloads the corresponding data onto the BlobServer
>> from
>> > where they are retrieved by the TaskManagers. Since you are running in
>> > non-HA mode, the BlobServer won't store the blobs on HDFS from where
>> they
>> > could be retrieved. Instead all the TaskManagers ask the single
>> BlobServer
>> > for the required TDD blobs. Depending on the size of the TDDs, the
>> > BlobServer might become the bottleneck.
>> >
>> > What you can try to do is the following
>> > 1) Try to reduce the closure object of the UDFs in the above-mentioned
>> task.
>> > 2) Increase `taskmanager.network.request-backoff.max` to give the
>> system
>> > more time to download the blobs
>> > 3) Run the cluster in HA mode which will store the blobs also under
>> > `high-availability.storageDir` (usually HDFS or S3). Before downloading
>> the
>> > blobs from the BlobServer, Flink will first try to download them from
>> the
>> > `high-availability-storageDir`
>> >
>> > Let me know if this solves your problem.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj...@gmail.com> wrote:
>> >>
>> >> Hi Nico,
>> >>
>> >> Please find the attachment for more logs.
>> >>
>> >> --
>> >> Thanks,
>> >> Amit
>> >>
>> >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <ni...@data-artisans.com>
>> >> wrote:
>> >> > Hi Amit,
>> >> > thanks for providing the logs, I'll look into it. We currently have a
>> >> > suspicion of this being caused by
>> >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
>> >> > looking over the surrounding code. The RC4 has been cancelled since
>> we
>> >> > see this as a release blocker.
>> >> >
>> >> > To rule out further errors, can you also provide logs for the task
>> >> > manager producing partitions d6946b39439f10e8189322becf1b8887,
>> >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c
>> 81?
>> >> > The task manager log you provided covers the task manager asking for
>> >> > this partition only for which the job manager produces the
>> >> > PartitionProducerDisposedException that you see.
>> >> > I'm looking for the logs of task managers with the following
>> execution
>> >> > IDs in their logs:
>> >> > - 2826f9d430e05e9defaa46f60292fa79
>> >> > - 7ef992a067881a07409819e3aea32004
>> >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
>> >> >
>> >> > Regarding the operators being stuck: I'll have a further look into
>> the
>> >> > logs and state transition and will come back to you.
>> >> >
>> >> >
>> >> > Nico
>> >> >
>> >> >
>> >> > On 21/05/18 09:27, Amit Jain wrote:
>> >> >> Hi All,
>> >> >>
>> >> >> I totally missed this thread. I've encountered same issue in Flink
>> >> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
>> >> >>
>> >> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
>> >> >>
>> >> >> --
>> >> >> Thanks,
>> >> >> Amit
>> >> >>
>> >> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <nico@data-artisans.com
>> >
>> >> >> wrote:
>> >> >>> Also, please have a look at the other TaskManagers' logs, in
>> >> >>> particular
>> >> >>> the one that is running the operator that was mentioned in the
>> >> >>> exception. You should look out for the ID
>> >> >>> 98f5976716234236dc69fb0e82a0cc34.
>> >> >>>
>> >> >>>
>> >> >>> Nico
>> >> >>>
>> >> >>>
>> >> >>> PS: Flink logs files should compress quite nicely if they grow too
>> big
>> >> >>> :)
>> >> >>>
>> >> >>> On 03/05/18 14:07, Stephan Ewen wrote:
>> >> >>>> Google Drive would be great.
>> >> >>>>
>> >> >>>> Thanks!
>> >> >>>>
>> >> >>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
>> >> >>>> <ma...@gmail.com>> wrote:
>> >> >>>>
>> >> >>>>     Hi Stephan,
>> >> >>>>
>> >> >>>>     Size of JM log file is 122 MB. Could you provide me other
>> media
>> >> >>>> to
>> >> >>>>     post the same? We can use Google Drive if that's fine with
>> you.
>> >> >>>>
>> >> >>>>     --
>> >> >>>>     Thanks,
>> >> >>>>     Amit
>> >> >>>>
>> >> >>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <
>> sewen@apache.org
>> >> >>>>     <ma...@apache.org>> wrote:
>> >> >>>>     > Hi Amit!
>> >> >>>>     >
>> >> >>>>     > Thanks for sharing this, this looks like a regression with
>> the
>> >> >>>>     network stack
>> >> >>>>     > changes.
>> >> >>>>     >
>> >> >>>>     > The log you shared from the TaskManager gives some hint, but
>> >> >>>> that
>> >> >>>>     exception
>> >> >>>>     > alone should not be a problem. That exception can occur
>> under a
>> >> >>>>     race between
>> >> >>>>     > deployment of some tasks while the whole job is entering a
>> >> >>>>     recovery phase
>> >> >>>>     > (maybe we should not print it so prominently to not confuse
>> >> >>>>     users). There
>> >> >>>>     > must be something else happening on the JobManager. Can you
>> >> >>>> share
>> >> >>>>     the JM
>> >> >>>>     > logs as well?
>> >> >>>>     >
>> >> >>>>     > Thanks a lot,
>> >> >>>>     > Stephan
>> >> >>>>     >
>> >> >>>>     >
>> >> >>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <
>> aj2011it@gmail.com
>> >> >>>>     <ma...@gmail.com>> wrote:
>> >> >>>>     >>
>> >> >>>>     >> Thanks! Fabian
>> >> >>>>     >>
>> >> >>>>     >> I will try using the current release-1.5 branch and update
>> >> >>>> this
>> >> >>>>     thread.
>> >> >>>>     >>
>> >> >>>>     >> --
>> >> >>>>     >> Thanks,
>> >> >>>>     >> Amit
>> >> >>>>     >>
>> >> >>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske
>> >> >>>> <fhueske@gmail.com
>> >> >>>>     <ma...@gmail.com>> wrote:
>> >> >>>>     >> > Hi Amit,
>> >> >>>>     >> >
>> >> >>>>     >> > We recently fixed a bug in the network stack that
>> affected
>> >> >>>>     batch jobs
>> >> >>>>     >> > (FLINK-9144).
>> >> >>>>     >> > The fix was added after your commit.
>> >> >>>>     >> >
>> >> >>>>     >> > Do you have a chance to build the current release-1.5
>> branch
>> >> >>>>     and check
>> >> >>>>     >> > if
>> >> >>>>     >> > the fix also resolves your problem?
>> >> >>>>     >> >
>> >> >>>>     >> > Otherwise it would be great if you could open a blocker
>> >> >>>> issue
>> >> >>>>     for the
>> >> >>>>     >> > 1.5
>> >> >>>>     >> > release to ensure that this is fixed.
>> >> >>>>     >> >
>> >> >>>>     >> > Thanks,
>> >> >>>>     >> > Fabian
>> >> >>>>     >> >
>> >> >>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>> >> >>>>     <ma...@gmail.com>>:
>> >> >>>>     >> >>
>> >> >>>>     >> >> Cluster is running on commit 2af481a
>> >> >>>>     >> >>
>> >> >>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain
>> >> >>>> <aj2011it@gmail.com
>> >> >>>>     <ma...@gmail.com>> wrote:
>> >> >>>>     >> >> > Hi,
>> >> >>>>     >> >> >
>> >> >>>>     >> >> > We are running numbers of batch jobs in Flink 1.5
>> cluster
>> >> >>>>     and few of
>> >> >>>>     >> >> > those
>> >> >>>>     >> >> > are getting stuck at random. These jobs having the
>> >> >>>> following
>> >> >>>>     failure
>> >> >>>>     >> >> > after
>> >> >>>>     >> >> > which operator status changes to CANCELED and stuck to
>> >> >>>> same.
>> >> >>>>     >> >> >
>> >> >>>>     >> >> > Please find complete TM's log at
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba20
>> 12
>> >> >>>>
>> >> >>>> <https://gist.github.com/imamitjain/
>> 066d0e99990ee24f2da1ddc83eba2012>
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> > 2018-04-29 14:57:24,437 INFO
>> >> >>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
>> >> >>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of
>> partition
>> >> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling
>> >> >>>> execution.
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.jobmanager.
>> PartitionProducerDisposedException:
>> >> >>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing
>> >> >>>> partition
>> >> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been
>> >> >>>> disposed.
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.jobmaster.JobMaster.
>> requestPartitionState(JobMaster.java:610)
>> >> >>>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.
>> invoke(Unknown
>> >> >>>> Source)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> >> >>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>> handleRpcInvocation(AkkaRpcActor.java:210)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>> handleMessage(AkkaRpcActor.java:154)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>> handleMessage(FencedAkkaRpcActor.java:69)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
>> onReceive$1(AkkaRpcActor.java:132)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
>> ActorCell.scala:544)
>> >> >>>>     >> >> > at akka.actor.Actor$class.
>> aroundReceive(Actor.scala:502)
>> >> >>>>     >> >> > at
>> >> >>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> >> >>>>     >> >> > at
>> >> >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> >> >>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> >> >>>>     >> >> > at
>> >> >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> >> >>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> >> >>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> ForkJoinTask.java:260)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>> >> >>>>     >> >> > at
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>
>> >> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>> >> >>>>     >> >> >
>> >> >>>>     >> >> >
>> >> >>>>     >> >> > Thanks
>> >> >>>>     >> >> > Amit
>> >> >>>>     >> >
>> >> >>>>     >> >
>> >> >>>>     >
>> >> >>>>     >
>> >> >>>>
>> >> >>>>
>> >> >>>
>> >> >>> --
>> >> >>> Nico Kruber | Software Engineer
>> >> >>> data Artisans
>> >> >>>
>> >> >>> Follow us @dataArtisans
>> >> >>> --
>> >> >>> Join Flink Forward - The Apache Flink Conference
>> >> >>> Stream Processing | Event Driven | Real Time
>> >> >>> --
>> >> >>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> <https://maps.google.com/?q=Stresemannstr.+121A,10963+Berlin,+Germany&entry=gmail&source=g>
>> >> >>> data Artisans, Inc. | 1161 Mission Street, San Francisco,
>> CA-94103,
>> <https://maps.google.com/?q=1161+Mission+Street,+San+Francisco,+CA-94103,+%0D%0A+USA&entry=gmail&source=g>
>> >> >>> USA
>> <https://maps.google.com/?q=1161+Mission+Street,+San+Francisco,+CA-94103,+%0D%0A+USA&entry=gmail&source=g>
>> >> >>> --
>> >> >>> Data Artisans GmbH
>> >> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>> >> >>>
>> >> >
>> >> > --
>> >> > Nico Kruber | Software Engineer
>> >> > data Artisans
>> >> >
>> >> > Follow us @dataArtisans
>> >> > --
>> >> > Join Flink Forward - The Apache Flink Conference
>> >> > Stream Processing | Event Driven | Real Time
>> >> > --
>> >> > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> <https://maps.google.com/?q=Stresemannstr.+121A,10963+Berlin,+Germany&entry=gmail&source=g>
>> >> > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103,
>> USA
>> <https://maps.google.com/?q=1161+Mission+Street,+San+Francisco,+CA-94103,+USA&entry=gmail&source=g>
>> >> > --
>> >> > Data Artisans GmbH
>> >> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>> >> >
>> >
>> >
>>
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Thanks Till. `taskmanager.network.request-backoff.max` option helped in my
case.  We tried this on 1.5.0 and jobs are running fine.


--
Thanks
Amit

On Thu 24 May, 2018, 4:58 PM Amit Jain, <aj...@gmail.com> wrote:

> Thanks! Till. I'll give a try on your suggestions and update the thread.
>
> On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann <tr...@apache.org>
> wrote:
> > Hi Amit,
> >
> > it looks as if the current cancellation cause is not the same as the
> > initially reported cancellation cause. In the current case, it looks as
> if
> > the deployment of your tasks takes so long that that maximum
> > `taskmanager.network.request-backoff.max` value has been reached. When
> this
> > happens a task gives up asking for the input result partition and fails
> with
> > the `PartitionNotFoundException`.
> >
> > More concretely, the `CHAIN Reduce (GroupReduce at
> > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
> > retrieve the result partition of the `CHAIN DataSource (at
> > createInput(ExecutionEnvironment.java:548)
> > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
> > org.apache.flink.api.java.io.TextInputFormat
> >
> [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/,
> >
> s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/])
> > -> Map (Key Extractor) -> Combine (GroupReduce at
> > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state
> > deploying when the exception occurs. It seems to me that this task takes
> > quite some time to be deployed.
> >
> > One reason why the deployment could take some time is that an UDF (for
> > example the closure) of one of the operators is quite large. If this is
> the
> > case, then Flink offloads the corresponding data onto the BlobServer from
> > where they are retrieved by the TaskManagers. Since you are running in
> > non-HA mode, the BlobServer won't store the blobs on HDFS from where they
> > could be retrieved. Instead all the TaskManagers ask the single
> BlobServer
> > for the required TDD blobs. Depending on the size of the TDDs, the
> > BlobServer might become the bottleneck.
> >
> > What you can try to do is the following
> > 1) Try to reduce the closure object of the UDFs in the above-mentioned
> task.
> > 2) Increase `taskmanager.network.request-backoff.max` to give the system
> > more time to download the blobs
> > 3) Run the cluster in HA mode which will store the blobs also under
> > `high-availability.storageDir` (usually HDFS or S3). Before downloading
> the
> > blobs from the BlobServer, Flink will first try to download them from the
> > `high-availability-storageDir`
> >
> > Let me know if this solves your problem.
> >
> > Cheers,
> > Till
> >
> > On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj...@gmail.com> wrote:
> >>
> >> Hi Nico,
> >>
> >> Please find the attachment for more logs.
> >>
> >> --
> >> Thanks,
> >> Amit
> >>
> >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <ni...@data-artisans.com>
> >> wrote:
> >> > Hi Amit,
> >> > thanks for providing the logs, I'll look into it. We currently have a
> >> > suspicion of this being caused by
> >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
> >> > looking over the surrounding code. The RC4 has been cancelled since we
> >> > see this as a release blocker.
> >> >
> >> > To rule out further errors, can you also provide logs for the task
> >> > manager producing partitions d6946b39439f10e8189322becf1b8887,
> >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
> >> > The task manager log you provided covers the task manager asking for
> >> > this partition only for which the job manager produces the
> >> > PartitionProducerDisposedException that you see.
> >> > I'm looking for the logs of task managers with the following execution
> >> > IDs in their logs:
> >> > - 2826f9d430e05e9defaa46f60292fa79
> >> > - 7ef992a067881a07409819e3aea32004
> >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
> >> >
> >> > Regarding the operators being stuck: I'll have a further look into the
> >> > logs and state transition and will come back to you.
> >> >
> >> >
> >> > Nico
> >> >
> >> >
> >> > On 21/05/18 09:27, Amit Jain wrote:
> >> >> Hi All,
> >> >>
> >> >> I totally missed this thread. I've encountered same issue in Flink
> >> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
> >> >>
> >> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
> >> >>
> >> >> --
> >> >> Thanks,
> >> >> Amit
> >> >>
> >> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com>
> >> >> wrote:
> >> >>> Also, please have a look at the other TaskManagers' logs, in
> >> >>> particular
> >> >>> the one that is running the operator that was mentioned in the
> >> >>> exception. You should look out for the ID
> >> >>> 98f5976716234236dc69fb0e82a0cc34.
> >> >>>
> >> >>>
> >> >>> Nico
> >> >>>
> >> >>>
> >> >>> PS: Flink logs files should compress quite nicely if they grow too
> big
> >> >>> :)
> >> >>>
> >> >>> On 03/05/18 14:07, Stephan Ewen wrote:
> >> >>>> Google Drive would be great.
> >> >>>>
> >> >>>> Thanks!
> >> >>>>
> >> >>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
> >> >>>> <ma...@gmail.com>> wrote:
> >> >>>>
> >> >>>>     Hi Stephan,
> >> >>>>
> >> >>>>     Size of JM log file is 122 MB. Could you provide me other media
> >> >>>> to
> >> >>>>     post the same? We can use Google Drive if that's fine with you.
> >> >>>>
> >> >>>>     --
> >> >>>>     Thanks,
> >> >>>>     Amit
> >> >>>>
> >> >>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <
> sewen@apache.org
> >> >>>>     <ma...@apache.org>> wrote:
> >> >>>>     > Hi Amit!
> >> >>>>     >
> >> >>>>     > Thanks for sharing this, this looks like a regression with
> the
> >> >>>>     network stack
> >> >>>>     > changes.
> >> >>>>     >
> >> >>>>     > The log you shared from the TaskManager gives some hint, but
> >> >>>> that
> >> >>>>     exception
> >> >>>>     > alone should not be a problem. That exception can occur
> under a
> >> >>>>     race between
> >> >>>>     > deployment of some tasks while the whole job is entering a
> >> >>>>     recovery phase
> >> >>>>     > (maybe we should not print it so prominently to not confuse
> >> >>>>     users). There
> >> >>>>     > must be something else happening on the JobManager. Can you
> >> >>>> share
> >> >>>>     the JM
> >> >>>>     > logs as well?
> >> >>>>     >
> >> >>>>     > Thanks a lot,
> >> >>>>     > Stephan
> >> >>>>     >
> >> >>>>     >
> >> >>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <
> aj2011it@gmail.com
> >> >>>>     <ma...@gmail.com>> wrote:
> >> >>>>     >>
> >> >>>>     >> Thanks! Fabian
> >> >>>>     >>
> >> >>>>     >> I will try using the current release-1.5 branch and update
> >> >>>> this
> >> >>>>     thread.
> >> >>>>     >>
> >> >>>>     >> --
> >> >>>>     >> Thanks,
> >> >>>>     >> Amit
> >> >>>>     >>
> >> >>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske
> >> >>>> <fhueske@gmail.com
> >> >>>>     <ma...@gmail.com>> wrote:
> >> >>>>     >> > Hi Amit,
> >> >>>>     >> >
> >> >>>>     >> > We recently fixed a bug in the network stack that affected
> >> >>>>     batch jobs
> >> >>>>     >> > (FLINK-9144).
> >> >>>>     >> > The fix was added after your commit.
> >> >>>>     >> >
> >> >>>>     >> > Do you have a chance to build the current release-1.5
> branch
> >> >>>>     and check
> >> >>>>     >> > if
> >> >>>>     >> > the fix also resolves your problem?
> >> >>>>     >> >
> >> >>>>     >> > Otherwise it would be great if you could open a blocker
> >> >>>> issue
> >> >>>>     for the
> >> >>>>     >> > 1.5
> >> >>>>     >> > release to ensure that this is fixed.
> >> >>>>     >> >
> >> >>>>     >> > Thanks,
> >> >>>>     >> > Fabian
> >> >>>>     >> >
> >> >>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
> >> >>>>     <ma...@gmail.com>>:
> >> >>>>     >> >>
> >> >>>>     >> >> Cluster is running on commit 2af481a
> >> >>>>     >> >>
> >> >>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain
> >> >>>> <aj2011it@gmail.com
> >> >>>>     <ma...@gmail.com>> wrote:
> >> >>>>     >> >> > Hi,
> >> >>>>     >> >> >
> >> >>>>     >> >> > We are running numbers of batch jobs in Flink 1.5
> cluster
> >> >>>>     and few of
> >> >>>>     >> >> > those
> >> >>>>     >> >> > are getting stuck at random. These jobs having the
> >> >>>> following
> >> >>>>     failure
> >> >>>>     >> >> > after
> >> >>>>     >> >> > which operator status changes to CANCELED and stuck to
> >> >>>> same.
> >> >>>>     >> >> >
> >> >>>>     >> >> > Please find complete TM's log at
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
> >> >>>>
> >> >>>> <
> https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> > 2018-04-29 14:57:24,437 INFO
> >> >>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
> >> >>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of
> partition
> >> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling
> >> >>>> execution.
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
> >> >>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing
> >> >>>> partition
> >> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been
> >> >>>> disposed.
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
> >> >>>>     >> >> > at
> sun.reflect.GeneratedMethodAccessor107.invoke(Unknown
> >> >>>> Source)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> >> >>>>     >> >> > at
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >> >>>>     >> >> > at
> >> >>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >> >>>>     >> >> > at
> >> >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >> >>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >> >>>>     >> >> > at
> >> >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >> >>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >> >>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >>>>     >> >> > at
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>
> >> >>>>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >>>>     >> >> >
> >> >>>>     >> >> >
> >> >>>>     >> >> > Thanks
> >> >>>>     >> >> > Amit
> >> >>>>     >> >
> >> >>>>     >> >
> >> >>>>     >
> >> >>>>     >
> >> >>>>
> >> >>>>
> >> >>>
> >> >>> --
> >> >>> Nico Kruber | Software Engineer
> >> >>> data Artisans
> >> >>>
> >> >>> Follow us @dataArtisans
> >> >>> --
> >> >>> Join Flink Forward - The Apache Flink Conference
> >> >>> Stream Processing | Event Driven | Real Time
> >> >>> --
> >> >>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> >> >>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103,
> >> >>> USA
> >> >>> --
> >> >>> Data Artisans GmbH
> >> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> >> >>>
> >> >
> >> > --
> >> > Nico Kruber | Software Engineer
> >> > data Artisans
> >> >
> >> > Follow us @dataArtisans
> >> > --
> >> > Join Flink Forward - The Apache Flink Conference
> >> > Stream Processing | Event Driven | Real Time
> >> > --
> >> > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> >> > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103,
> USA
> >> > --
> >> > Data Artisans GmbH
> >> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> >> >
> >
> >
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Thanks! Till. I'll give a try on your suggestions and update the thread.

On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann <tr...@apache.org> wrote:
> Hi Amit,
>
> it looks as if the current cancellation cause is not the same as the
> initially reported cancellation cause. In the current case, it looks as if
> the deployment of your tasks takes so long that that maximum
> `taskmanager.network.request-backoff.max` value has been reached. When this
> happens a task gives up asking for the input result partition and fails with
> the `PartitionNotFoundException`.
>
> More concretely, the `CHAIN Reduce (GroupReduce at
> first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
> retrieve the result partition of the `CHAIN DataSource (at
> createInput(ExecutionEnvironment.java:548)
> (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
> org.apache.flink.api.java.io.TextInputFormat
> [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/,
> s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/])
> -> Map (Key Extractor) -> Combine (GroupReduce at
> first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state
> deploying when the exception occurs. It seems to me that this task takes
> quite some time to be deployed.
>
> One reason why the deployment could take some time is that an UDF (for
> example the closure) of one of the operators is quite large. If this is the
> case, then Flink offloads the corresponding data onto the BlobServer from
> where they are retrieved by the TaskManagers. Since you are running in
> non-HA mode, the BlobServer won't store the blobs on HDFS from where they
> could be retrieved. Instead all the TaskManagers ask the single BlobServer
> for the required TDD blobs. Depending on the size of the TDDs, the
> BlobServer might become the bottleneck.
>
> What you can try to do is the following
> 1) Try to reduce the closure object of the UDFs in the above-mentioned task.
> 2) Increase `taskmanager.network.request-backoff.max` to give the system
> more time to download the blobs
> 3) Run the cluster in HA mode which will store the blobs also under
> `high-availability.storageDir` (usually HDFS or S3). Before downloading the
> blobs from the BlobServer, Flink will first try to download them from the
> `high-availability-storageDir`
>
> Let me know if this solves your problem.
>
> Cheers,
> Till
>
> On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj...@gmail.com> wrote:
>>
>> Hi Nico,
>>
>> Please find the attachment for more logs.
>>
>> --
>> Thanks,
>> Amit
>>
>> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <ni...@data-artisans.com>
>> wrote:
>> > Hi Amit,
>> > thanks for providing the logs, I'll look into it. We currently have a
>> > suspicion of this being caused by
>> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
>> > looking over the surrounding code. The RC4 has been cancelled since we
>> > see this as a release blocker.
>> >
>> > To rule out further errors, can you also provide logs for the task
>> > manager producing partitions d6946b39439f10e8189322becf1b8887,
>> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
>> > The task manager log you provided covers the task manager asking for
>> > this partition only for which the job manager produces the
>> > PartitionProducerDisposedException that you see.
>> > I'm looking for the logs of task managers with the following execution
>> > IDs in their logs:
>> > - 2826f9d430e05e9defaa46f60292fa79
>> > - 7ef992a067881a07409819e3aea32004
>> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
>> >
>> > Regarding the operators being stuck: I'll have a further look into the
>> > logs and state transition and will come back to you.
>> >
>> >
>> > Nico
>> >
>> >
>> > On 21/05/18 09:27, Amit Jain wrote:
>> >> Hi All,
>> >>
>> >> I totally missed this thread. I've encountered same issue in Flink
>> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
>> >>
>> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
>> >>
>> >> --
>> >> Thanks,
>> >> Amit
>> >>
>> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com>
>> >> wrote:
>> >>> Also, please have a look at the other TaskManagers' logs, in
>> >>> particular
>> >>> the one that is running the operator that was mentioned in the
>> >>> exception. You should look out for the ID
>> >>> 98f5976716234236dc69fb0e82a0cc34.
>> >>>
>> >>>
>> >>> Nico
>> >>>
>> >>>
>> >>> PS: Flink logs files should compress quite nicely if they grow too big
>> >>> :)
>> >>>
>> >>> On 03/05/18 14:07, Stephan Ewen wrote:
>> >>>> Google Drive would be great.
>> >>>>
>> >>>> Thanks!
>> >>>>
>> >>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
>> >>>> <ma...@gmail.com>> wrote:
>> >>>>
>> >>>>     Hi Stephan,
>> >>>>
>> >>>>     Size of JM log file is 122 MB. Could you provide me other media
>> >>>> to
>> >>>>     post the same? We can use Google Drive if that's fine with you.
>> >>>>
>> >>>>     --
>> >>>>     Thanks,
>> >>>>     Amit
>> >>>>
>> >>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
>> >>>>     <ma...@apache.org>> wrote:
>> >>>>     > Hi Amit!
>> >>>>     >
>> >>>>     > Thanks for sharing this, this looks like a regression with the
>> >>>>     network stack
>> >>>>     > changes.
>> >>>>     >
>> >>>>     > The log you shared from the TaskManager gives some hint, but
>> >>>> that
>> >>>>     exception
>> >>>>     > alone should not be a problem. That exception can occur under a
>> >>>>     race between
>> >>>>     > deployment of some tasks while the whole job is entering a
>> >>>>     recovery phase
>> >>>>     > (maybe we should not print it so prominently to not confuse
>> >>>>     users). There
>> >>>>     > must be something else happening on the JobManager. Can you
>> >>>> share
>> >>>>     the JM
>> >>>>     > logs as well?
>> >>>>     >
>> >>>>     > Thanks a lot,
>> >>>>     > Stephan
>> >>>>     >
>> >>>>     >
>> >>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
>> >>>>     <ma...@gmail.com>> wrote:
>> >>>>     >>
>> >>>>     >> Thanks! Fabian
>> >>>>     >>
>> >>>>     >> I will try using the current release-1.5 branch and update
>> >>>> this
>> >>>>     thread.
>> >>>>     >>
>> >>>>     >> --
>> >>>>     >> Thanks,
>> >>>>     >> Amit
>> >>>>     >>
>> >>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske
>> >>>> <fhueske@gmail.com
>> >>>>     <ma...@gmail.com>> wrote:
>> >>>>     >> > Hi Amit,
>> >>>>     >> >
>> >>>>     >> > We recently fixed a bug in the network stack that affected
>> >>>>     batch jobs
>> >>>>     >> > (FLINK-9144).
>> >>>>     >> > The fix was added after your commit.
>> >>>>     >> >
>> >>>>     >> > Do you have a chance to build the current release-1.5 branch
>> >>>>     and check
>> >>>>     >> > if
>> >>>>     >> > the fix also resolves your problem?
>> >>>>     >> >
>> >>>>     >> > Otherwise it would be great if you could open a blocker
>> >>>> issue
>> >>>>     for the
>> >>>>     >> > 1.5
>> >>>>     >> > release to ensure that this is fixed.
>> >>>>     >> >
>> >>>>     >> > Thanks,
>> >>>>     >> > Fabian
>> >>>>     >> >
>> >>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>> >>>>     <ma...@gmail.com>>:
>> >>>>     >> >>
>> >>>>     >> >> Cluster is running on commit 2af481a
>> >>>>     >> >>
>> >>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain
>> >>>> <aj2011it@gmail.com
>> >>>>     <ma...@gmail.com>> wrote:
>> >>>>     >> >> > Hi,
>> >>>>     >> >> >
>> >>>>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
>> >>>>     and few of
>> >>>>     >> >> > those
>> >>>>     >> >> > are getting stuck at random. These jobs having the
>> >>>> following
>> >>>>     failure
>> >>>>     >> >> > after
>> >>>>     >> >> > which operator status changes to CANCELED and stuck to
>> >>>> same.
>> >>>>     >> >> >
>> >>>>     >> >> > Please find complete TM's log at
>> >>>>     >> >> >
>> >>>>
>> >>>> https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>> >>>>
>> >>>> <https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> > 2018-04-29 14:57:24,437 INFO
>> >>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
>> >>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling
>> >>>> execution.
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>> >>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing
>> >>>> partition
>> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been
>> >>>> disposed.
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>> >>>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown
>> >>>> Source)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>> >>>>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> >>>>     >> >> > at
>> >>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> >>>>     >> >> > at
>> >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> >>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> >>>>     >> >> > at
>> >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> >>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> >>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>
>> >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>>>     >> >> > at
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>
>> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>>>     >> >> >
>> >>>>     >> >> >
>> >>>>     >> >> > Thanks
>> >>>>     >> >> > Amit
>> >>>>     >> >
>> >>>>     >> >
>> >>>>     >
>> >>>>     >
>> >>>>
>> >>>>
>> >>>
>> >>> --
>> >>> Nico Kruber | Software Engineer
>> >>> data Artisans
>> >>>
>> >>> Follow us @dataArtisans
>> >>> --
>> >>> Join Flink Forward - The Apache Flink Conference
>> >>> Stream Processing | Event Driven | Real Time
>> >>> --
>> >>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> >>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103,
>> >>> USA
>> >>> --
>> >>> Data Artisans GmbH
>> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>> >>>
>> >
>> > --
>> > Nico Kruber | Software Engineer
>> > data Artisans
>> >
>> > Follow us @dataArtisans
>> > --
>> > Join Flink Forward - The Apache Flink Conference
>> > Stream Processing | Event Driven | Real Time
>> > --
>> > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>> > --
>> > Data Artisans GmbH
>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>> >
>
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Till Rohrmann <tr...@apache.org>.
Hi Amit,

it looks as if the current cancellation cause is not the same as the
initially reported cancellation cause. In the current case, it looks as if
the deployment of your tasks takes so long that that maximum
`taskmanager.network.request-backoff.max` value has been reached. When this
happens a task gives up asking for the input result partition and fails
with the `PartitionNotFoundException`.

More concretely, the `CHAIN Reduce (GroupReduce at
first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
retrieve the result partition of the `CHAIN DataSource (at
createInput(ExecutionEnvironment.java:548)
(org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
org.apache.flink.api.java.io.TextInputFormat
[s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/,
s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/])
-> Map (Key Extractor) -> Combine (GroupReduce at
first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state
deploying when the exception occurs. It seems to me that this task takes
quite some time to be deployed.

One reason why the deployment could take some time is that an UDF (for
example the closure) of one of the operators is quite large. If this is the
case, then Flink offloads the corresponding data onto the BlobServer from
where they are retrieved by the TaskManagers. Since you are running in
non-HA mode, the BlobServer won't store the blobs on HDFS from where they
could be retrieved. Instead all the TaskManagers ask the single BlobServer
for the required TDD blobs. Depending on the size of the TDDs, the
BlobServer might become the bottleneck.

What you can try to do is the following
1) Try to reduce the closure object of the UDFs in the above-mentioned task.
2) Increase `taskmanager.network.request-backoff.max` to give the system
more time to download the blobs
3) Run the cluster in HA mode which will store the blobs also under
`high-availability.storageDir` (usually HDFS or S3). Before downloading the
blobs from the BlobServer, Flink will first try to download them from the
`high-availability-storageDir`

Let me know if this solves your problem.

Cheers,
Till

On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj...@gmail.com> wrote:

> Hi Nico,
>
> Please find the attachment for more logs.
>
> --
> Thanks,
> Amit
>
> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <ni...@data-artisans.com>
> wrote:
> > Hi Amit,
> > thanks for providing the logs, I'll look into it. We currently have a
> > suspicion of this being caused by
> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
> > looking over the surrounding code. The RC4 has been cancelled since we
> > see this as a release blocker.
> >
> > To rule out further errors, can you also provide logs for the task
> > manager producing partitions d6946b39439f10e8189322becf1b8887,
> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
> > The task manager log you provided covers the task manager asking for
> > this partition only for which the job manager produces the
> > PartitionProducerDisposedException that you see.
> > I'm looking for the logs of task managers with the following execution
> > IDs in their logs:
> > - 2826f9d430e05e9defaa46f60292fa79
> > - 7ef992a067881a07409819e3aea32004
> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
> >
> > Regarding the operators being stuck: I'll have a further look into the
> > logs and state transition and will come back to you.
> >
> >
> > Nico
> >
> >
> > On 21/05/18 09:27, Amit Jain wrote:
> >> Hi All,
> >>
> >> I totally missed this thread. I've encountered same issue in Flink
> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
> >>
> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
> >>
> >> --
> >> Thanks,
> >> Amit
> >>
> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com>
> wrote:
> >>> Also, please have a look at the other TaskManagers' logs, in particular
> >>> the one that is running the operator that was mentioned in the
> >>> exception. You should look out for the ID
> 98f5976716234236dc69fb0e82a0cc34.
> >>>
> >>>
> >>> Nico
> >>>
> >>>
> >>> PS: Flink logs files should compress quite nicely if they grow too big
> :)
> >>>
> >>> On 03/05/18 14:07, Stephan Ewen wrote:
> >>>> Google Drive would be great.
> >>>>
> >>>> Thanks!
> >>>>
> >>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
> >>>> <ma...@gmail.com>> wrote:
> >>>>
> >>>>     Hi Stephan,
> >>>>
> >>>>     Size of JM log file is 122 MB. Could you provide me other media to
> >>>>     post the same? We can use Google Drive if that's fine with you.
> >>>>
> >>>>     --
> >>>>     Thanks,
> >>>>     Amit
> >>>>
> >>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
> >>>>     <ma...@apache.org>> wrote:
> >>>>     > Hi Amit!
> >>>>     >
> >>>>     > Thanks for sharing this, this looks like a regression with the
> >>>>     network stack
> >>>>     > changes.
> >>>>     >
> >>>>     > The log you shared from the TaskManager gives some hint, but
> that
> >>>>     exception
> >>>>     > alone should not be a problem. That exception can occur under a
> >>>>     race between
> >>>>     > deployment of some tasks while the whole job is entering a
> >>>>     recovery phase
> >>>>     > (maybe we should not print it so prominently to not confuse
> >>>>     users). There
> >>>>     > must be something else happening on the JobManager. Can you
> share
> >>>>     the JM
> >>>>     > logs as well?
> >>>>     >
> >>>>     > Thanks a lot,
> >>>>     > Stephan
> >>>>     >
> >>>>     >
> >>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
> >>>>     <ma...@gmail.com>> wrote:
> >>>>     >>
> >>>>     >> Thanks! Fabian
> >>>>     >>
> >>>>     >> I will try using the current release-1.5 branch and update this
> >>>>     thread.
> >>>>     >>
> >>>>     >> --
> >>>>     >> Thanks,
> >>>>     >> Amit
> >>>>     >>
> >>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <
> fhueske@gmail.com
> >>>>     <ma...@gmail.com>> wrote:
> >>>>     >> > Hi Amit,
> >>>>     >> >
> >>>>     >> > We recently fixed a bug in the network stack that affected
> >>>>     batch jobs
> >>>>     >> > (FLINK-9144).
> >>>>     >> > The fix was added after your commit.
> >>>>     >> >
> >>>>     >> > Do you have a chance to build the current release-1.5 branch
> >>>>     and check
> >>>>     >> > if
> >>>>     >> > the fix also resolves your problem?
> >>>>     >> >
> >>>>     >> > Otherwise it would be great if you could open a blocker issue
> >>>>     for the
> >>>>     >> > 1.5
> >>>>     >> > release to ensure that this is fixed.
> >>>>     >> >
> >>>>     >> > Thanks,
> >>>>     >> > Fabian
> >>>>     >> >
> >>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
> >>>>     <ma...@gmail.com>>:
> >>>>     >> >>
> >>>>     >> >> Cluster is running on commit 2af481a
> >>>>     >> >>
> >>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <
> aj2011it@gmail.com
> >>>>     <ma...@gmail.com>> wrote:
> >>>>     >> >> > Hi,
> >>>>     >> >> >
> >>>>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
> >>>>     and few of
> >>>>     >> >> > those
> >>>>     >> >> > are getting stuck at random. These jobs having the
> following
> >>>>     failure
> >>>>     >> >> > after
> >>>>     >> >> > which operator status changes to CANCELED and stuck to
> same.
> >>>>     >> >> >
> >>>>     >> >> > Please find complete TM's log at
> >>>>     >> >> >
> >>>>     https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba20
> 12
> >>>>     <https://gist.github.com/imamitjain/
> 066d0e99990ee24f2da1ddc83eba2012>
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> > 2018-04-29 14:57:24,437 INFO
> >>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
> >>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling
> execution.
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.jobmanager.
> PartitionProducerDisposedException:
> >>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing
> partition
> >>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been
> disposed.
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.jobmaster.JobMaster.
> requestPartitionState(JobMaster.java:610)
> >>>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown
> Source)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleRpcInvocation(AkkaRpcActor.java:210)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleMessage(AkkaRpcActor.java:154)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
> handleMessage(FencedAkkaRpcActor.java:69)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
> onReceive$1(AkkaRpcActor.java:132)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
> ActorCell.scala:544)
> >>>>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >>>>     >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.
> scala:95)
> >>>>     >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.
> scala:526)
> >>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >>>>     >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:
> 257)
> >>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >>>>     >> >> > at
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >>>>     >> >> >
> >>>>     >> >> >
> >>>>     >> >> > Thanks
> >>>>     >> >> > Amit
> >>>>     >> >
> >>>>     >> >
> >>>>     >
> >>>>     >
> >>>>
> >>>>
> >>>
> >>> --
> >>> Nico Kruber | Software Engineer
> >>> data Artisans
> >>>
> >>> Follow us @dataArtisans
> >>> --
> >>> Join Flink Forward - The Apache Flink Conference
> >>> Stream Processing | Event Driven | Real Time
> >>> --
> >>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> >>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> >>> --
> >>> Data Artisans GmbH
> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> >>>
> >
> > --
> > Nico Kruber | Software Engineer
> > data Artisans
> >
> > Follow us @dataArtisans
> > --
> > Join Flink Forward - The Apache Flink Conference
> > Stream Processing | Event Driven | Real Time
> > --
> > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> > --
> > Data Artisans GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> >
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Hi Nico,

Please find the attachment for more logs.

--
Thanks,
Amit

On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <ni...@data-artisans.com> wrote:
> Hi Amit,
> thanks for providing the logs, I'll look into it. We currently have a
> suspicion of this being caused by
> https://issues.apache.org/jira/browse/FLINK-9406 which we found by
> looking over the surrounding code. The RC4 has been cancelled since we
> see this as a release blocker.
>
> To rule out further errors, can you also provide logs for the task
> manager producing partitions d6946b39439f10e8189322becf1b8887,
> 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
> The task manager log you provided covers the task manager asking for
> this partition only for which the job manager produces the
> PartitionProducerDisposedException that you see.
> I'm looking for the logs of task managers with the following execution
> IDs in their logs:
> - 2826f9d430e05e9defaa46f60292fa79
> - 7ef992a067881a07409819e3aea32004
> - ec923ce6d891d89cf6fecb5e3f5b7cc5
>
> Regarding the operators being stuck: I'll have a further look into the
> logs and state transition and will come back to you.
>
>
> Nico
>
>
> On 21/05/18 09:27, Amit Jain wrote:
>> Hi All,
>>
>> I totally missed this thread. I've encountered same issue in Flink
>> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
>>
>> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
>>
>> --
>> Thanks,
>> Amit
>>
>> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com> wrote:
>>> Also, please have a look at the other TaskManagers' logs, in particular
>>> the one that is running the operator that was mentioned in the
>>> exception. You should look out for the ID 98f5976716234236dc69fb0e82a0cc34.
>>>
>>>
>>> Nico
>>>
>>>
>>> PS: Flink logs files should compress quite nicely if they grow too big :)
>>>
>>> On 03/05/18 14:07, Stephan Ewen wrote:
>>>> Google Drive would be great.
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>>     Hi Stephan,
>>>>
>>>>     Size of JM log file is 122 MB. Could you provide me other media to
>>>>     post the same? We can use Google Drive if that's fine with you.
>>>>
>>>>     --
>>>>     Thanks,
>>>>     Amit
>>>>
>>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
>>>>     <ma...@apache.org>> wrote:
>>>>     > Hi Amit!
>>>>     >
>>>>     > Thanks for sharing this, this looks like a regression with the
>>>>     network stack
>>>>     > changes.
>>>>     >
>>>>     > The log you shared from the TaskManager gives some hint, but that
>>>>     exception
>>>>     > alone should not be a problem. That exception can occur under a
>>>>     race between
>>>>     > deployment of some tasks while the whole job is entering a
>>>>     recovery phase
>>>>     > (maybe we should not print it so prominently to not confuse
>>>>     users). There
>>>>     > must be something else happening on the JobManager. Can you share
>>>>     the JM
>>>>     > logs as well?
>>>>     >
>>>>     > Thanks a lot,
>>>>     > Stephan
>>>>     >
>>>>     >
>>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
>>>>     <ma...@gmail.com>> wrote:
>>>>     >>
>>>>     >> Thanks! Fabian
>>>>     >>
>>>>     >> I will try using the current release-1.5 branch and update this
>>>>     thread.
>>>>     >>
>>>>     >> --
>>>>     >> Thanks,
>>>>     >> Amit
>>>>     >>
>>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhueske@gmail.com
>>>>     <ma...@gmail.com>> wrote:
>>>>     >> > Hi Amit,
>>>>     >> >
>>>>     >> > We recently fixed a bug in the network stack that affected
>>>>     batch jobs
>>>>     >> > (FLINK-9144).
>>>>     >> > The fix was added after your commit.
>>>>     >> >
>>>>     >> > Do you have a chance to build the current release-1.5 branch
>>>>     and check
>>>>     >> > if
>>>>     >> > the fix also resolves your problem?
>>>>     >> >
>>>>     >> > Otherwise it would be great if you could open a blocker issue
>>>>     for the
>>>>     >> > 1.5
>>>>     >> > release to ensure that this is fixed.
>>>>     >> >
>>>>     >> > Thanks,
>>>>     >> > Fabian
>>>>     >> >
>>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>>>>     <ma...@gmail.com>>:
>>>>     >> >>
>>>>     >> >> Cluster is running on commit 2af481a
>>>>     >> >>
>>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj2011it@gmail.com
>>>>     <ma...@gmail.com>> wrote:
>>>>     >> >> > Hi,
>>>>     >> >> >
>>>>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
>>>>     and few of
>>>>     >> >> > those
>>>>     >> >> > are getting stuck at random. These jobs having the following
>>>>     failure
>>>>     >> >> > after
>>>>     >> >> > which operator status changes to CANCELED and stuck to same.
>>>>     >> >> >
>>>>     >> >> > Please find complete TM's log at
>>>>     >> >> >
>>>>     https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>>>>     <https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> > 2018-04-29 14:57:24,437 INFO
>>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
>>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>>>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>>>>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>     >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>     >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>     >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     >> >> > at
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> >
>>>>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>     >> >> >
>>>>     >> >> >
>>>>     >> >> > Thanks
>>>>     >> >> > Amit
>>>>     >> >
>>>>     >> >
>>>>     >
>>>>     >
>>>>
>>>>
>>>
>>> --
>>> Nico Kruber | Software Engineer
>>> data Artisans
>>>
>>> Follow us @dataArtisans
>>> --
>>> Join Flink Forward - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Amit,
thanks for providing the logs, I'll look into it. We currently have a
suspicion of this being caused by
https://issues.apache.org/jira/browse/FLINK-9406 which we found by
looking over the surrounding code. The RC4 has been cancelled since we
see this as a release blocker.

To rule out further errors, can you also provide logs for the task
manager producing partitions d6946b39439f10e8189322becf1b8887,
9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
The task manager log you provided covers the task manager asking for
this partition only for which the job manager produces the
PartitionProducerDisposedException that you see.
I'm looking for the logs of task managers with the following execution
IDs in their logs:
- 2826f9d430e05e9defaa46f60292fa79
- 7ef992a067881a07409819e3aea32004
- ec923ce6d891d89cf6fecb5e3f5b7cc5

Regarding the operators being stuck: I'll have a further look into the
logs and state transition and will come back to you.


Nico


On 21/05/18 09:27, Amit Jain wrote:
> Hi All,
> 
> I totally missed this thread. I've encountered same issue in Flink
> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
> 
> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
> 
> --
> Thanks,
> Amit
> 
> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com> wrote:
>> Also, please have a look at the other TaskManagers' logs, in particular
>> the one that is running the operator that was mentioned in the
>> exception. You should look out for the ID 98f5976716234236dc69fb0e82a0cc34.
>>
>>
>> Nico
>>
>>
>> PS: Flink logs files should compress quite nicely if they grow too big :)
>>
>> On 03/05/18 14:07, Stephan Ewen wrote:
>>> Google Drive would be great.
>>>
>>> Thanks!
>>>
>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
>>> <ma...@gmail.com>> wrote:
>>>
>>>     Hi Stephan,
>>>
>>>     Size of JM log file is 122 MB. Could you provide me other media to
>>>     post the same? We can use Google Drive if that's fine with you.
>>>
>>>     --
>>>     Thanks,
>>>     Amit
>>>
>>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
>>>     <ma...@apache.org>> wrote:
>>>     > Hi Amit!
>>>     >
>>>     > Thanks for sharing this, this looks like a regression with the
>>>     network stack
>>>     > changes.
>>>     >
>>>     > The log you shared from the TaskManager gives some hint, but that
>>>     exception
>>>     > alone should not be a problem. That exception can occur under a
>>>     race between
>>>     > deployment of some tasks while the whole job is entering a
>>>     recovery phase
>>>     > (maybe we should not print it so prominently to not confuse
>>>     users). There
>>>     > must be something else happening on the JobManager. Can you share
>>>     the JM
>>>     > logs as well?
>>>     >
>>>     > Thanks a lot,
>>>     > Stephan
>>>     >
>>>     >
>>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>     >>
>>>     >> Thanks! Fabian
>>>     >>
>>>     >> I will try using the current release-1.5 branch and update this
>>>     thread.
>>>     >>
>>>     >> --
>>>     >> Thanks,
>>>     >> Amit
>>>     >>
>>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhueske@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>     >> > Hi Amit,
>>>     >> >
>>>     >> > We recently fixed a bug in the network stack that affected
>>>     batch jobs
>>>     >> > (FLINK-9144).
>>>     >> > The fix was added after your commit.
>>>     >> >
>>>     >> > Do you have a chance to build the current release-1.5 branch
>>>     and check
>>>     >> > if
>>>     >> > the fix also resolves your problem?
>>>     >> >
>>>     >> > Otherwise it would be great if you could open a blocker issue
>>>     for the
>>>     >> > 1.5
>>>     >> > release to ensure that this is fixed.
>>>     >> >
>>>     >> > Thanks,
>>>     >> > Fabian
>>>     >> >
>>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>>>     <ma...@gmail.com>>:
>>>     >> >>
>>>     >> >> Cluster is running on commit 2af481a
>>>     >> >>
>>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj2011it@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>     >> >> > Hi,
>>>     >> >> >
>>>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
>>>     and few of
>>>     >> >> > those
>>>     >> >> > are getting stuck at random. These jobs having the following
>>>     failure
>>>     >> >> > after
>>>     >> >> > which operator status changes to CANCELED and stuck to same.
>>>     >> >> >
>>>     >> >> > Please find complete TM's log at
>>>     >> >> >
>>>     https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>>>     <https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
>>>     >> >> >
>>>     >> >> >
>>>     >> >> > 2018-04-29 14:57:24,437 INFO
>>>     >> >> > org.apache.flink.runtime.taskmanager.Task
>>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>>>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>     >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>     >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>     >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>     >> >> > at
>>>     >> >> >
>>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     >> >> > at
>>>     >> >> >
>>>     >> >> >
>>>     >> >> >
>>>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>     >> >> >
>>>     >> >> >
>>>     >> >> > Thanks
>>>     >> >> > Amit
>>>     >> >
>>>     >> >
>>>     >
>>>     >
>>>
>>>
>>
>> --
>> Nico Kruber | Software Engineer
>> data Artisans
>>
>> Follow us @dataArtisans
>> --
>> Join Flink Forward - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Hi All,

I totally missed this thread. I've encountered same issue in Flink
1.5.0 RC4. Please look over the attached logs of JM and impacted TM.

Job ID 390a96eaae733f8e2f12fc6c49b26b8b

--
Thanks,
Amit

On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <ni...@data-artisans.com> wrote:
> Also, please have a look at the other TaskManagers' logs, in particular
> the one that is running the operator that was mentioned in the
> exception. You should look out for the ID 98f5976716234236dc69fb0e82a0cc34.
>
>
> Nico
>
>
> PS: Flink logs files should compress quite nicely if they grow too big :)
>
> On 03/05/18 14:07, Stephan Ewen wrote:
>> Google Drive would be great.
>>
>> Thanks!
>>
>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>>     Hi Stephan,
>>
>>     Size of JM log file is 122 MB. Could you provide me other media to
>>     post the same? We can use Google Drive if that's fine with you.
>>
>>     --
>>     Thanks,
>>     Amit
>>
>>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
>>     <ma...@apache.org>> wrote:
>>     > Hi Amit!
>>     >
>>     > Thanks for sharing this, this looks like a regression with the
>>     network stack
>>     > changes.
>>     >
>>     > The log you shared from the TaskManager gives some hint, but that
>>     exception
>>     > alone should not be a problem. That exception can occur under a
>>     race between
>>     > deployment of some tasks while the whole job is entering a
>>     recovery phase
>>     > (maybe we should not print it so prominently to not confuse
>>     users). There
>>     > must be something else happening on the JobManager. Can you share
>>     the JM
>>     > logs as well?
>>     >
>>     > Thanks a lot,
>>     > Stephan
>>     >
>>     >
>>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
>>     <ma...@gmail.com>> wrote:
>>     >>
>>     >> Thanks! Fabian
>>     >>
>>     >> I will try using the current release-1.5 branch and update this
>>     thread.
>>     >>
>>     >> --
>>     >> Thanks,
>>     >> Amit
>>     >>
>>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhueske@gmail.com
>>     <ma...@gmail.com>> wrote:
>>     >> > Hi Amit,
>>     >> >
>>     >> > We recently fixed a bug in the network stack that affected
>>     batch jobs
>>     >> > (FLINK-9144).
>>     >> > The fix was added after your commit.
>>     >> >
>>     >> > Do you have a chance to build the current release-1.5 branch
>>     and check
>>     >> > if
>>     >> > the fix also resolves your problem?
>>     >> >
>>     >> > Otherwise it would be great if you could open a blocker issue
>>     for the
>>     >> > 1.5
>>     >> > release to ensure that this is fixed.
>>     >> >
>>     >> > Thanks,
>>     >> > Fabian
>>     >> >
>>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>>     <ma...@gmail.com>>:
>>     >> >>
>>     >> >> Cluster is running on commit 2af481a
>>     >> >>
>>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj2011it@gmail.com
>>     <ma...@gmail.com>> wrote:
>>     >> >> > Hi,
>>     >> >> >
>>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
>>     and few of
>>     >> >> > those
>>     >> >> > are getting stuck at random. These jobs having the following
>>     failure
>>     >> >> > after
>>     >> >> > which operator status changes to CANCELED and stuck to same.
>>     >> >> >
>>     >> >> > Please find complete TM's log at
>>     >> >> >
>>     https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>>     <https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
>>     >> >> >
>>     >> >> >
>>     >> >> > 2018-04-29 14:57:24,437 INFO
>>     >> >> > org.apache.flink.runtime.taskmanager.Task
>>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>     >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>     >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>     >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>     >> >> > at
>>     >> >> >
>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     >> >> > at
>>     >> >> >
>>     >> >> >
>>     >> >> >
>>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>     >> >> >
>>     >> >> >
>>     >> >> > Thanks
>>     >> >> > Amit
>>     >> >
>>     >> >
>>     >
>>     >
>>
>>
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Nico Kruber <ni...@data-artisans.com>.
Also, please have a look at the other TaskManagers' logs, in particular
the one that is running the operator that was mentioned in the
exception. You should look out for the ID 98f5976716234236dc69fb0e82a0cc34.


Nico


PS: Flink logs files should compress quite nicely if they grow too big :)

On 03/05/18 14:07, Stephan Ewen wrote:
> Google Drive would be great.
> 
> Thanks!
> 
> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj2011it@gmail.com
> <ma...@gmail.com>> wrote:
> 
>     Hi Stephan,
> 
>     Size of JM log file is 122 MB. Could you provide me other media to
>     post the same? We can use Google Drive if that's fine with you.
> 
>     --
>     Thanks,
>     Amit
> 
>     On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <sewen@apache.org
>     <ma...@apache.org>> wrote:
>     > Hi Amit!
>     >
>     > Thanks for sharing this, this looks like a regression with the
>     network stack
>     > changes.
>     >
>     > The log you shared from the TaskManager gives some hint, but that
>     exception
>     > alone should not be a problem. That exception can occur under a
>     race between
>     > deployment of some tasks while the whole job is entering a
>     recovery phase
>     > (maybe we should not print it so prominently to not confuse
>     users). There
>     > must be something else happening on the JobManager. Can you share
>     the JM
>     > logs as well?
>     >
>     > Thanks a lot,
>     > Stephan
>     >
>     >
>     > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj2011it@gmail.com
>     <ma...@gmail.com>> wrote:
>     >>
>     >> Thanks! Fabian
>     >>
>     >> I will try using the current release-1.5 branch and update this
>     thread.
>     >>
>     >> --
>     >> Thanks,
>     >> Amit
>     >>
>     >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhueske@gmail.com
>     <ma...@gmail.com>> wrote:
>     >> > Hi Amit,
>     >> >
>     >> > We recently fixed a bug in the network stack that affected
>     batch jobs
>     >> > (FLINK-9144).
>     >> > The fix was added after your commit.
>     >> >
>     >> > Do you have a chance to build the current release-1.5 branch
>     and check
>     >> > if
>     >> > the fix also resolves your problem?
>     >> >
>     >> > Otherwise it would be great if you could open a blocker issue
>     for the
>     >> > 1.5
>     >> > release to ensure that this is fixed.
>     >> >
>     >> > Thanks,
>     >> > Fabian
>     >> >
>     >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj2011it@gmail.com
>     <ma...@gmail.com>>:
>     >> >>
>     >> >> Cluster is running on commit 2af481a
>     >> >>
>     >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj2011it@gmail.com
>     <ma...@gmail.com>> wrote:
>     >> >> > Hi,
>     >> >> >
>     >> >> > We are running numbers of batch jobs in Flink 1.5 cluster
>     and few of
>     >> >> > those
>     >> >> > are getting stuck at random. These jobs having the following
>     failure
>     >> >> > after
>     >> >> > which operator status changes to CANCELED and stuck to same.
>     >> >> >
>     >> >> > Please find complete TM's log at
>     >> >> >
>     https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>     <https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012>
>     >> >> >
>     >> >> >
>     >> >> > 2018-04-29 14:57:24,437 INFO
>     >> >> > org.apache.flink.runtime.taskmanager.Task
>     >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>     >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>     >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>     >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>     >> >> > at
>     >> >> >
>     >> >> >
>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>     >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>     >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>     >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>     >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>     >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>     >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>     >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>     >> >> > at
>     >> >> >
>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     >> >> > at
>     >> >> >
>     >> >> >
>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     >> >> > at
>     >> >> >
>     >> >> >
>     >> >> >
>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>     >> >> >
>     >> >> >
>     >> >> > Thanks
>     >> >> > Amit
>     >> >
>     >> >
>     >
>     >
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Stephan Ewen <se...@apache.org>.
Google Drive would be great.

Thanks!

On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj...@gmail.com> wrote:

> Hi Stephan,
>
> Size of JM log file is 122 MB. Could you provide me other media to
> post the same? We can use Google Drive if that's fine with you.
>
> --
> Thanks,
> Amit
>
> On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <se...@apache.org> wrote:
> > Hi Amit!
> >
> > Thanks for sharing this, this looks like a regression with the network
> stack
> > changes.
> >
> > The log you shared from the TaskManager gives some hint, but that
> exception
> > alone should not be a problem. That exception can occur under a race
> between
> > deployment of some tasks while the whole job is entering a recovery phase
> > (maybe we should not print it so prominently to not confuse users). There
> > must be something else happening on the JobManager. Can you share the JM
> > logs as well?
> >
> > Thanks a lot,
> > Stephan
> >
> >
> > On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj...@gmail.com> wrote:
> >>
> >> Thanks! Fabian
> >>
> >> I will try using the current release-1.5 branch and update this thread.
> >>
> >> --
> >> Thanks,
> >> Amit
> >>
> >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >> > Hi Amit,
> >> >
> >> > We recently fixed a bug in the network stack that affected batch jobs
> >> > (FLINK-9144).
> >> > The fix was added after your commit.
> >> >
> >> > Do you have a chance to build the current release-1.5 branch and check
> >> > if
> >> > the fix also resolves your problem?
> >> >
> >> > Otherwise it would be great if you could open a blocker issue for the
> >> > 1.5
> >> > release to ensure that this is fixed.
> >> >
> >> > Thanks,
> >> > Fabian
> >> >
> >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj...@gmail.com>:
> >> >>
> >> >> Cluster is running on commit 2af481a
> >> >>
> >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com>
> wrote:
> >> >> > Hi,
> >> >> >
> >> >> > We are running numbers of batch jobs in Flink 1.5 cluster and few
> of
> >> >> > those
> >> >> > are getting stuck at random. These jobs having the following
> failure
> >> >> > after
> >> >> > which operator status changes to CANCELED and stuck to same.
> >> >> >
> >> >> > Please find complete TM's log at
> >> >> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba20
> 12
> >> >> >
> >> >> >
> >> >> > 2018-04-29 14:57:24,437 INFO
> >> >> > org.apache.flink.runtime.taskmanager.Task
> >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
> >> >> >
> >> >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedExcep
> tion:
> >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
> >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.flink.runtime.jobmaster.JobMaster.
> requestPartitionState(JobMaster.java:610)
> >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleRpcInvocation(AkkaRpcActor.java:210)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleMessage(AkkaRpcActor.java:154)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
> handleMessage(FencedAkkaRpcActor.java:69)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
> onReceive$1(AkkaRpcActor.java:132)
> >> >> > at
> >> >> >
> >> >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
> ActorCell.scala:544)
> >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >> >> > at
> >> >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >> >> > at
> >> >> >
> >> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> > Amit
> >> >
> >> >
> >
> >
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Hi Stephan,

Size of JM log file is 122 MB. Could you provide me other media to
post the same? We can use Google Drive if that's fine with you.

--
Thanks,
Amit

On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Amit!
>
> Thanks for sharing this, this looks like a regression with the network stack
> changes.
>
> The log you shared from the TaskManager gives some hint, but that exception
> alone should not be a problem. That exception can occur under a race between
> deployment of some tasks while the whole job is entering a recovery phase
> (maybe we should not print it so prominently to not confuse users). There
> must be something else happening on the JobManager. Can you share the JM
> logs as well?
>
> Thanks a lot,
> Stephan
>
>
> On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj...@gmail.com> wrote:
>>
>> Thanks! Fabian
>>
>> I will try using the current release-1.5 branch and update this thread.
>>
>> --
>> Thanks,
>> Amit
>>
>> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fh...@gmail.com> wrote:
>> > Hi Amit,
>> >
>> > We recently fixed a bug in the network stack that affected batch jobs
>> > (FLINK-9144).
>> > The fix was added after your commit.
>> >
>> > Do you have a chance to build the current release-1.5 branch and check
>> > if
>> > the fix also resolves your problem?
>> >
>> > Otherwise it would be great if you could open a blocker issue for the
>> > 1.5
>> > release to ensure that this is fixed.
>> >
>> > Thanks,
>> > Fabian
>> >
>> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj...@gmail.com>:
>> >>
>> >> Cluster is running on commit 2af481a
>> >>
>> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
>> >> > those
>> >> > are getting stuck at random. These jobs having the following failure
>> >> > after
>> >> > which operator status changes to CANCELED and stuck to same.
>> >> >
>> >> > Please find complete TM's log at
>> >> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>> >> >
>> >> >
>> >> > 2018-04-29 14:57:24,437 INFO
>> >> > org.apache.flink.runtime.taskmanager.Task
>> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>> >> >
>> >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>> >> > at
>> >> >
>> >> >
>> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>> >> > at
>> >> >
>> >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> >> > at
>> >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> > at
>> >> >
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> > at
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> > at
>> >> >
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> >
>> >> >
>> >> > Thanks
>> >> > Amit
>> >
>> >
>
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Stephan Ewen <se...@apache.org>.
Hi Amit!

Thanks for sharing this, this looks like a regression with the network
stack changes.

The log you shared from the TaskManager gives some hint, but that exception
alone should not be a problem. That exception can occur under a race
between deployment of some tasks while the whole job is entering a recovery
phase (maybe we should not print it so prominently to not confuse users).
There must be something else happening on the JobManager. Can you share the
JM logs as well?

Thanks a lot,
Stephan


On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj...@gmail.com> wrote:

> Thanks! Fabian
>
> I will try using the current release-1.5 branch and update this thread.
>
> --
> Thanks,
> Amit
>
> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fh...@gmail.com> wrote:
> > Hi Amit,
> >
> > We recently fixed a bug in the network stack that affected batch jobs
> > (FLINK-9144).
> > The fix was added after your commit.
> >
> > Do you have a chance to build the current release-1.5 branch and check if
> > the fix also resolves your problem?
> >
> > Otherwise it would be great if you could open a blocker issue for the 1.5
> > release to ensure that this is fixed.
> >
> > Thanks,
> > Fabian
> >
> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj...@gmail.com>:
> >>
> >> Cluster is running on commit 2af481a
> >>
> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
> >> > those
> >> > are getting stuck at random. These jobs having the following failure
> >> > after
> >> > which operator status changes to CANCELED and stuck to same.
> >> >
> >> > Please find complete TM's log at
> >> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
> >> >
> >> >
> >> > 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.
> taskmanager.Task
> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
> >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedExcep
> tion:
> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
> >> > at
> >> >
> >> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(
> JobMaster.java:610)
> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> >> > at
> >> >
> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >> > at
> >> >
> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:210)
> >> > at
> >> >
> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleMessage(AkkaRpcActor.java:154)
> >> > at
> >> >
> >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(
> FencedAkkaRpcActor.java:69)
> >> > at
> >> >
> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
> onReceive$1(AkkaRpcActor.java:132)
> >> > at
> >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
> ActorCell.scala:544)
> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> >> > at
> >> >
> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >> > at
> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >> > at
> >> >
> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >> >
> >> >
> >> > Thanks
> >> > Amit
> >
> >
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Thanks! Fabian

I will try using the current release-1.5 branch and update this thread.

--
Thanks,
Amit

On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Amit,
>
> We recently fixed a bug in the network stack that affected batch jobs
> (FLINK-9144).
> The fix was added after your commit.
>
> Do you have a chance to build the current release-1.5 branch and check if
> the fix also resolves your problem?
>
> Otherwise it would be great if you could open a blocker issue for the 1.5
> release to ensure that this is fixed.
>
> Thanks,
> Fabian
>
> 2018-04-29 18:30 GMT+02:00 Amit Jain <aj...@gmail.com>:
>>
>> Cluster is running on commit 2af481a
>>
>> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com> wrote:
>> > Hi,
>> >
>> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
>> > those
>> > are getting stuck at random. These jobs having the following failure
>> > after
>> > which operator status changes to CANCELED and stuck to same.
>> >
>> > Please find complete TM's log at
>> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>> >
>> >
>> > 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>> > at
>> >
>> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>> > at
>> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > Thanks
>> > Amit
>
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Amit,

We recently fixed a bug in the network stack that affected batch jobs
(FLINK-9144).
The fix was added after your commit.

Do you have a chance to build the current release-1.5 branch and check if
the fix also resolves your problem?

Otherwise it would be great if you could open a blocker issue for the 1.5
release to ensure that this is fixed.

Thanks,
Fabian

2018-04-29 18:30 GMT+02:00 Amit Jain <aj...@gmail.com>:

> Cluster is running on commit 2af481a
>
> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com> wrote:
> > Hi,
> >
> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
> those
> > are getting stuck at random. These jobs having the following failure
> after
> > which operator status changes to CANCELED and stuck to same.
> >
> > Please find complete TM's log at
> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
> >
> >
> > 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
> > at
> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(
> JobMaster.java:610)
> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:210)
> > at
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleMessage(AkkaRpcActor.java:154)
> > at
> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(
> FencedAkkaRpcActor.java:69)
> > at
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
> onReceive$1(AkkaRpcActor.java:132)
> > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
> ActorCell.scala:544)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> > at
> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >
> >
> > Thanks
> > Amit
>

Re: Batch job stuck in Canceled state in Flink 1.5

Posted by Amit Jain <aj...@gmail.com>.
Cluster is running on commit 2af481a

On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj...@gmail.com> wrote:
> Hi,
>
> We are running numbers of batch jobs in Flink 1.5 cluster and few of those
> are getting stuck at random. These jobs having the following failure after
> which operator status changes to CANCELED and stuck to same.
>
> Please find complete TM's log at
> https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012
>
>
> 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
> - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
> org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
> Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
> 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
> at
> org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
> at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thanks
> Amit