You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by XiangWei Huang <xw...@gmail.com> on 2017/11/06 02:27:36 UTC
ExecutionGraph not serializable
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with
message RequestJob(jobID). Blow is the detail of Exception:
[ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)
java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
So,is it a bug or the way to get job’s executionGraph is invalid.
Best,XiangWei
Re: ExecutionGraph not serializable
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
it is actually not intended to get access to the ExecutionGraph, because it
is a runtime component which does not make much sense to exist outside of
the JobManager. The RequestJob message is only a hack to make the
ExecutionGraph accessible to another actor running in the same ActorSystem.
This is the case for the WebRuntimeMonitor handlers. With Flip-6, we will
make the ExecutionGraph indirectly accessible by returning
an ArchivedExecutionGraph.
Cheers,
Till
On Tue, Nov 7, 2017 at 2:47 PM, XiangWei Huang <xw...@gmail.com>
wrote:
> hi Till,
>
> Sorry,I've made a mistake,i used *StandaloneClusterClient*#*getJobManagerGateway
> *to get *ActorGateway *to communicate with *JobManager *instead of using
> *JobMasterGateway*.
> Below is the code i executed for getting ExecuteGraph of a Job.
>
>
> val flinkConfig = new Configuration()
> val flinkCli = new StandaloneClusterClient(flinkConfig)
> * val jobManagerGateWay = flinkCli.getJobManagerGateway*
> val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new
> FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[
> RunningJobsStatus]]
> val jobsStatus = Await.result(jobs,new FiniteDuration(10,TimeUnit.
> SECONDS)).getStatusMessages().asScala.head
> val jobId = jobsStatus.getJobId
> val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
> *val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)*
> val result = Await.result(future,timeOut)
>
> JobManager threw NotSerializableException when i executed this code. So i
> wonder how is this happened and is there another way to get a job's
> ExecutionGraph programmatically.
>
> Best,XiangWei
>
> 2017-11-07 17:16 GMT+08:00 Till Rohrmann <tr...@apache.org>:
>
>> Hi XiangWei,
>>
>> how do you use the JobMasterGateway with the actor message RequestJob?
>> The JobMasterGateway is a Java interface and does not represent an
>> ActorCell to which you can send actor messages. Instead you should call
>> JobMasterGateway#requestArchivedExecutionGraph.
>>
>> Cheers,
>> Till
>>
>>
>> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi XiangWei,
>>>
>>> I don't think this is a public interface, but Till (in CC) might know
>>> better.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang <xw...@gmail.com>:
>>>
>>>> Hi Flink users,
>>>> Flink Jobmanager throw a NotSerializableException when i used
>>>> JobMasterGateway to get ExecutionGraph of a specific job with
>>>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>>>
>>>>
>>>> [ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>>>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>>>
>>>>
>>>> Best,XiangWei
>>>>
>>>>
>>>>
>>>
>>
>
Re: ExecutionGraph not serializable
Posted by XiangWei Huang <xw...@gmail.com>.
hi Till,
Sorry,I've made a mistake,i used
*StandaloneClusterClient*#*getJobManagerGateway
*to get *ActorGateway *to communicate with *JobManager *instead of using
*JobMasterGateway*.
Below is the code i executed for getting ExecuteGraph of a Job.
val flinkConfig = new Configuration()
val flinkCli = new StandaloneClusterClient(flinkConfig)
* val jobManagerGateWay = flinkCli.getJobManagerGateway*
val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new
FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[RunningJobsStatus]]
val jobsStatus = Await.result(jobs,new
FiniteDuration(10,TimeUnit.SECONDS)).getStatusMessages().asScala.head
val jobId = jobsStatus.getJobId
val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
*val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)*
val result = Await.result(future,timeOut)
JobManager threw NotSerializableException when i executed this code. So i
wonder how is this happened and is there another way to get a job's
ExecutionGraph programmatically.
Best,XiangWei
2017-11-07 17:16 GMT+08:00 Till Rohrmann <tr...@apache.org>:
> Hi XiangWei,
>
> how do you use the JobMasterGateway with the actor message RequestJob?
> The JobMasterGateway is a Java interface and does not represent an
> ActorCell to which you can send actor messages. Instead you should call
> JobMasterGateway#requestArchivedExecutionGraph.
>
> Cheers,
> Till
>
>
> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi XiangWei,
>>
>> I don't think this is a public interface, but Till (in CC) might know
>> better.
>>
>> Best,
>> Fabian
>>
>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang <xw...@gmail.com>:
>>
>>> Hi Flink users,
>>> Flink Jobmanager throw a NotSerializableException when i used
>>> JobMasterGateway to get ExecutionGraph of a specific job with
>>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>>
>>>
>>> [ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>>
>>>
>>> Best,XiangWei
>>>
>>>
>>>
>>
>
Re: ExecutionGraph not serializable
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
how do you use the JobMasterGateway with the actor message RequestJob? The
JobMasterGateway is a Java interface and does not represent an ActorCell to
which you can send actor messages. Instead you should call
JobMasterGateway#requestArchivedExecutionGraph.
Cheers,
Till
On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi XiangWei,
>
> I don't think this is a public interface, but Till (in CC) might know
> better.
>
> Best,
> Fabian
>
> 2017-11-06 3:27 GMT+01:00 XiangWei Huang <xw...@gmail.com>:
>
>> Hi Flink users,
>> Flink Jobmanager throw a NotSerializableException when i used
>> JobMasterGateway to get ExecutionGraph of a specific job with
>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>
>>
>> [ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>
>>
>> Best,XiangWei
>>
>>
>>
>
Re: ExecutionGraph not serializable
Posted by Fabian Hueske <fh...@gmail.com>.
Hi XiangWei,
I don't think this is a public interface, but Till (in CC) might know
better.
Best,
Fabian
2017-11-06 3:27 GMT+01:00 XiangWei Huang <xw...@gmail.com>:
> Hi Flink users,
> Flink Jobmanager throw a NotSerializableException when i used
> JobMasterGateway to get ExecutionGraph of a specific job with
> message *RequestJob(jobID). *Blow is the detail of Exception:
>
>
> [ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live)java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> So,is it a bug or the way to get job’s executionGraph is invalid.
>
>
> Best,XiangWei
>
>
>