You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Greg Hogan <co...@greghogan.com> on 2016/03/15 16:19:56 UTC

Association failure ClassNotFoundException

I am seeing a failure running my code starting with commit 0f8d76c6
(ExecutionConfig to JobGraph).

Logs and stack trace are below.

Using default configuration so a single TaskManager. From the web UI, data
port is 33245 and path is akka.tcp://
flink@192.168.14.134:41339/user/taskmanager.

Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
e.detailMessage is "java.lang.ClassNotFoundException:
generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
flink@127.0.0.1:51428".

I have an old version of BlockInfo which works without error:

public class BlockInfo {

    public long seed;

    public long edges;
}

The new version of BlockInfo leading to the error:

public class BlockInfo<T extends RandomGenerator> {

    private final RandomGenerable<T> randomGenerable;

    private final int blockIndex;

    private final long firstElement;

    private final long elementCount;

    ...
}

In this execution the RandomGenerable has only a single field, a long. I am
puzzled and unsure where to look next.

Greg



Client log:

2016-03-15 10:07:25,745 WARN
akka.remote.ReliableDeliverySupervisor                        - Association
with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address is
now gated for [5000] ms. Reason is: [Disassociated].
2016-03-15 10:08:25,692 INFO
org.apache.flink.runtime.client.JobClientActor                - Terminate
JobClientActor.


JobManager log:

2016-03-15 10:07:25,514 DEBUG
akka.serialization.Serialization(akka://flink)                - Using
serializer[akka.serialization.JavaSerializer] for message
[akka.actor.ActorIdentity]
2016-03-15 10:07:25,533 DEBUG
akka.serialization.Serialization(akka://flink)                - Using
serializer[akka.serialization.JavaSerializer] for message
[java.lang.Integer]
2016-03-15 10:07:25,547 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection            - Received
PUT request for content addressable BLOB
2016-03-15 10:07:25,731 WARN
akka.remote.ReliableDeliverySupervisor                        - Association
with remote system [akka.tcp://flink@127.0.0.1:49738] has failed, address
is now gated for [5000] ms. Reason is: [generator.rmat.random.BlockInfo].


Client stack trace:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Communication with JobManager failed: Job submission to
the JobManager timed out.
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
    at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at Driver.main(Driver.java:462)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Communication with JobManager failed: Job submission to the JobManager
timed out.
    at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:379)
    ... 16 more
Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out.
    at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
    at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
    at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
    at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: Association failure ClassNotFoundException

Posted by Till Rohrmann <tr...@apache.org>.
Hi Greg,

I’ve found the problem’s cause. In fact you spotted a really mean and
subtle bug introduced by FLINK-3327. In a nutshell, the problem is that we
now send the ExecutionConfig as part of the JobGraph to the JobManager. The
ExecutionConfig can contain classes (Class<BlockInfo>) of automatically
registered user code types, such as BlockInfo. Since the information is
directly sent to the JobManager instead of serializing it and sending the
byte array, Akka tries to load the classes upon deserialization. Since the
types are part of the user code, the class loader used by Akka cannot load
them and fails with a ClassNotFoundException. This causes the SubmitJob
message to be silently dropped.

I’ve also opened a JIRA [1] for it. I’ll try to fix the problem.

[1] https://issues.apache.org/jira/browse/FLINK-3633

Cheers,
Till
​

On Wed, Mar 16, 2016 at 8:52 PM, Greg Hogan <co...@greghogan.com> wrote:

> I realized shortly after responding that since I had moved the offending
> code into Gelly, which jar I then copied into the lib folder, that this
> would not be interfacing the user classloader. So I have merged the Gelly
> jar as well as the Gelly examples jar into a single jar which is included
> in my build at:
>
> https://s3.amazonaws.com/apache-flink/flink-1.1-SNAPSHOT.txz
>
> Are you able to replicate with the following command:
>
> $ ./bin/flink run -c org.apache.flink.graph.examples.Graph500
> flink-gelly_with_examples_2.10-1.1-SNAPSHOT.jar
>
> On Tue, Mar 15, 2016 at 5:16 PM, Greg Hogan <co...@greghogan.com> wrote:
>
> > Hi Till,
> >
> > The code in question is part of FLINK-2909 which is currently running on
> > Travis but which does not trigger this issue. I'll keep looking.
> >
> > Thanks,
> > Greg
> >
> > On Tue, Mar 15, 2016 at 11:30 AM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> >> Hi Greg,
> >>
> >> could you share an example program with us which reproduces the
> problem? I
> >> suspect that, somehow, your user code class BlockInfo is sent directly
> to
> >> the JobManager where it is deserialized without the user code class
> >> loader.
> >>
> >> Cheers,
> >> Till
> >> ​
> >>
> >> On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <co...@greghogan.com> wrote:
> >>
> >> > I am seeing a failure running my code starting with commit 0f8d76c6
> >> > (ExecutionConfig to JobGraph).
> >> >
> >> > Logs and stack trace are below.
> >> >
> >> > Using default configuration so a single TaskManager. From the web UI,
> >> data
> >> > port is 33245 and path is akka.tcp://
> >> > flink@192.168.14.134:41339/user/taskmanager.
> >> >
> >> > Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
> >> > e.detailMessage is "java.lang.ClassNotFoundException:
> >> > generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
> >> > flink@127.0.0.1:51428".
> >> >
> >> > I have an old version of BlockInfo which works without error:
> >> >
> >> > public class BlockInfo {
> >> >
> >> >     public long seed;
> >> >
> >> >     public long edges;
> >> > }
> >> >
> >> > The new version of BlockInfo leading to the error:
> >> >
> >> > public class BlockInfo<T extends RandomGenerator> {
> >> >
> >> >     private final RandomGenerable<T> randomGenerable;
> >> >
> >> >     private final int blockIndex;
> >> >
> >> >     private final long firstElement;
> >> >
> >> >     private final long elementCount;
> >> >
> >> >     ...
> >> > }
> >> >
> >> > In this execution the RandomGenerable has only a single field, a long.
> >> I am
> >> > puzzled and unsure where to look next.
> >> >
> >> > Greg
> >> >
> >> >
> >> >
> >> > Client log:
> >> >
> >> > 2016-03-15 10:07:25,745 WARN
> >> > akka.remote.ReliableDeliverySupervisor                        -
> >> Association
> >> > with remote system [akka.tcp://flink@127.0.0.1:6123] has failed,
> >> address
> >> > is
> >> > now gated for [5000] ms. Reason is: [Disassociated].
> >> > 2016-03-15 10:08:25,692 INFO
> >> > org.apache.flink.runtime.client.JobClientActor                -
> >> Terminate
> >> > JobClientActor.
> >> >
> >> >
> >> > JobManager log:
> >> >
> >> > 2016-03-15 10:07:25,514 DEBUG
> >> > akka.serialization.Serialization(akka://flink)                - Using
> >> > serializer[akka.serialization.JavaSerializer] for message
> >> > [akka.actor.ActorIdentity]
> >> > 2016-03-15 10:07:25,533 DEBUG
> >> > akka.serialization.Serialization(akka://flink)                - Using
> >> > serializer[akka.serialization.JavaSerializer] for message
> >> > [java.lang.Integer]
> >> > 2016-03-15 10:07:25,547 DEBUG
> >> > org.apache.flink.runtime.blob.BlobServerConnection            -
> Received
> >> > PUT request for content addressable BLOB
> >> > 2016-03-15 10:07:25,731 WARN
> >> > akka.remote.ReliableDeliverySupervisor                        -
> >> Association
> >> > with remote system [akka.tcp://flink@127.0.0.1:49738] has failed,
> >> address
> >> > is now gated for [5000] ms. Reason is:
> >> [generator.rmat.random.BlockInfo].
> >> >
> >> >
> >> > Client stack trace:
> >> >
> >> > The program finished with the following exception:
> >> >
> >> > org.apache.flink.client.program.ProgramInvocationException: The
> program
> >> > execution failed: Communication with JobManager failed: Job submission
> >> to
> >> > the JobManager timed out.
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >> >     at Driver.main(Driver.java:462)
> >> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >     at
> >> >
> >> >
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >     at
> >> >
> >> >
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >     at java.lang.reflect.Method.invoke(Method.java:497)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >> >     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >> >     at
> >> >
> >>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >> >     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> >> > Communication with JobManager failed: Job submission to the JobManager
> >> > timed out.
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:379)
> >> >     ... 16 more
> >> > Caused by:
> >> >
> >>
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> >> > Job submission to the JobManager timed out.
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> >> >     at
> >> >
> >> >
> >>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> >> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >> >     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> >> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >> >     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >> >     at
> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> >> >     at
> >> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >
> >>
> >
> >
>

Re: Association failure ClassNotFoundException

Posted by Greg Hogan <co...@greghogan.com>.
I realized shortly after responding that since I had moved the offending
code into Gelly, which jar I then copied into the lib folder, that this
would not be interfacing the user classloader. So I have merged the Gelly
jar as well as the Gelly examples jar into a single jar which is included
in my build at:

https://s3.amazonaws.com/apache-flink/flink-1.1-SNAPSHOT.txz

Are you able to replicate with the following command:

$ ./bin/flink run -c org.apache.flink.graph.examples.Graph500
flink-gelly_with_examples_2.10-1.1-SNAPSHOT.jar

On Tue, Mar 15, 2016 at 5:16 PM, Greg Hogan <co...@greghogan.com> wrote:

> Hi Till,
>
> The code in question is part of FLINK-2909 which is currently running on
> Travis but which does not trigger this issue. I'll keep looking.
>
> Thanks,
> Greg
>
> On Tue, Mar 15, 2016 at 11:30 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Greg,
>>
>> could you share an example program with us which reproduces the problem? I
>> suspect that, somehow, your user code class BlockInfo is sent directly to
>> the JobManager where it is deserialized without the user code class
>> loader.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <co...@greghogan.com> wrote:
>>
>> > I am seeing a failure running my code starting with commit 0f8d76c6
>> > (ExecutionConfig to JobGraph).
>> >
>> > Logs and stack trace are below.
>> >
>> > Using default configuration so a single TaskManager. From the web UI,
>> data
>> > port is 33245 and path is akka.tcp://
>> > flink@192.168.14.134:41339/user/taskmanager.
>> >
>> > Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
>> > e.detailMessage is "java.lang.ClassNotFoundException:
>> > generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
>> > flink@127.0.0.1:51428".
>> >
>> > I have an old version of BlockInfo which works without error:
>> >
>> > public class BlockInfo {
>> >
>> >     public long seed;
>> >
>> >     public long edges;
>> > }
>> >
>> > The new version of BlockInfo leading to the error:
>> >
>> > public class BlockInfo<T extends RandomGenerator> {
>> >
>> >     private final RandomGenerable<T> randomGenerable;
>> >
>> >     private final int blockIndex;
>> >
>> >     private final long firstElement;
>> >
>> >     private final long elementCount;
>> >
>> >     ...
>> > }
>> >
>> > In this execution the RandomGenerable has only a single field, a long.
>> I am
>> > puzzled and unsure where to look next.
>> >
>> > Greg
>> >
>> >
>> >
>> > Client log:
>> >
>> > 2016-03-15 10:07:25,745 WARN
>> > akka.remote.ReliableDeliverySupervisor                        -
>> Association
>> > with remote system [akka.tcp://flink@127.0.0.1:6123] has failed,
>> address
>> > is
>> > now gated for [5000] ms. Reason is: [Disassociated].
>> > 2016-03-15 10:08:25,692 INFO
>> > org.apache.flink.runtime.client.JobClientActor                -
>> Terminate
>> > JobClientActor.
>> >
>> >
>> > JobManager log:
>> >
>> > 2016-03-15 10:07:25,514 DEBUG
>> > akka.serialization.Serialization(akka://flink)                - Using
>> > serializer[akka.serialization.JavaSerializer] for message
>> > [akka.actor.ActorIdentity]
>> > 2016-03-15 10:07:25,533 DEBUG
>> > akka.serialization.Serialization(akka://flink)                - Using
>> > serializer[akka.serialization.JavaSerializer] for message
>> > [java.lang.Integer]
>> > 2016-03-15 10:07:25,547 DEBUG
>> > org.apache.flink.runtime.blob.BlobServerConnection            - Received
>> > PUT request for content addressable BLOB
>> > 2016-03-15 10:07:25,731 WARN
>> > akka.remote.ReliableDeliverySupervisor                        -
>> Association
>> > with remote system [akka.tcp://flink@127.0.0.1:49738] has failed,
>> address
>> > is now gated for [5000] ms. Reason is:
>> [generator.rmat.random.BlockInfo].
>> >
>> >
>> > Client stack trace:
>> >
>> > The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The program
>> > execution failed: Communication with JobManager failed: Job submission
>> to
>> > the JobManager timed out.
>> >     at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >     at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >     at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >     at
>> >
>> >
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >     at
>> >
>> >
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>> >     at Driver.main(Driver.java:462)
>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >     at
>> >
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >     at
>> >
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >     at java.lang.reflect.Method.invoke(Method.java:497)
>> >     at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >     at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >     at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >     at
>> >
>> >
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >     at
>> >
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> > Communication with JobManager failed: Job submission to the JobManager
>> > timed out.
>> >     at
>> >
>> >
>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
>> >     at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:379)
>> >     ... 16 more
>> > Caused by:
>> >
>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> > Job submission to the JobManager timed out.
>> >     at
>> >
>> >
>> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
>> >     at
>> >
>> >
>> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
>> >     at
>> >
>> >
>> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>> >     at
>> >
>> >
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >     at
>> >
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> >     at
>> >
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> >     at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >     at
>> >
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>>
>
>

Re: Association failure ClassNotFoundException

Posted by Greg Hogan <co...@greghogan.com>.
Hi Till,

The code in question is part of FLINK-2909 which is currently running on
Travis but which does not trigger this issue. I'll keep looking.

Thanks,
Greg

On Tue, Mar 15, 2016 at 11:30 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Greg,
>
> could you share an example program with us which reproduces the problem? I
> suspect that, somehow, your user code class BlockInfo is sent directly to
> the JobManager where it is deserialized without the user code class loader.
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <co...@greghogan.com> wrote:
>
> > I am seeing a failure running my code starting with commit 0f8d76c6
> > (ExecutionConfig to JobGraph).
> >
> > Logs and stack trace are below.
> >
> > Using default configuration so a single TaskManager. From the web UI,
> data
> > port is 33245 and path is akka.tcp://
> > flink@192.168.14.134:41339/user/taskmanager.
> >
> > Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
> > e.detailMessage is "java.lang.ClassNotFoundException:
> > generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
> > flink@127.0.0.1:51428".
> >
> > I have an old version of BlockInfo which works without error:
> >
> > public class BlockInfo {
> >
> >     public long seed;
> >
> >     public long edges;
> > }
> >
> > The new version of BlockInfo leading to the error:
> >
> > public class BlockInfo<T extends RandomGenerator> {
> >
> >     private final RandomGenerable<T> randomGenerable;
> >
> >     private final int blockIndex;
> >
> >     private final long firstElement;
> >
> >     private final long elementCount;
> >
> >     ...
> > }
> >
> > In this execution the RandomGenerable has only a single field, a long. I
> am
> > puzzled and unsure where to look next.
> >
> > Greg
> >
> >
> >
> > Client log:
> >
> > 2016-03-15 10:07:25,745 WARN
> > akka.remote.ReliableDeliverySupervisor                        -
> Association
> > with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address
> > is
> > now gated for [5000] ms. Reason is: [Disassociated].
> > 2016-03-15 10:08:25,692 INFO
> > org.apache.flink.runtime.client.JobClientActor                - Terminate
> > JobClientActor.
> >
> >
> > JobManager log:
> >
> > 2016-03-15 10:07:25,514 DEBUG
> > akka.serialization.Serialization(akka://flink)                - Using
> > serializer[akka.serialization.JavaSerializer] for message
> > [akka.actor.ActorIdentity]
> > 2016-03-15 10:07:25,533 DEBUG
> > akka.serialization.Serialization(akka://flink)                - Using
> > serializer[akka.serialization.JavaSerializer] for message
> > [java.lang.Integer]
> > 2016-03-15 10:07:25,547 DEBUG
> > org.apache.flink.runtime.blob.BlobServerConnection            - Received
> > PUT request for content addressable BLOB
> > 2016-03-15 10:07:25,731 WARN
> > akka.remote.ReliableDeliverySupervisor                        -
> Association
> > with remote system [akka.tcp://flink@127.0.0.1:49738] has failed,
> address
> > is now gated for [5000] ms. Reason is: [generator.rmat.random.BlockInfo].
> >
> >
> > Client stack trace:
> >
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Communication with JobManager failed: Job submission to
> > the JobManager timed out.
> >     at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >     at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >     at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >     at
> >
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >     at
> >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >     at Driver.main(Driver.java:462)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >     at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >     at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke(Method.java:497)
> >     at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >     at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >     at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >     at
> >
> >
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >     at
> >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Communication with JobManager failed: Job submission to the JobManager
> > timed out.
> >     at
> >
> >
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
> >     at
> org.apache.flink.client.program.Client.runBlocking(Client.java:379)
> >     ... 16 more
> > Caused by:
> > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> > Job submission to the JobManager timed out.
> >     at
> >
> >
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
> >     at
> >
> >
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> >     at
> >
> >
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> >     at
> >
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> >     at
> >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> >     at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
>

Re: Association failure ClassNotFoundException

Posted by Till Rohrmann <tr...@apache.org>.
Hi Greg,

could you share an example program with us which reproduces the problem? I
suspect that, somehow, your user code class BlockInfo is sent directly to
the JobManager where it is deserialized without the user code class loader.

Cheers,
Till
​

On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <co...@greghogan.com> wrote:

> I am seeing a failure running my code starting with commit 0f8d76c6
> (ExecutionConfig to JobGraph).
>
> Logs and stack trace are below.
>
> Using default configuration so a single TaskManager. From the web UI, data
> port is 33245 and path is akka.tcp://
> flink@192.168.14.134:41339/user/taskmanager.
>
> Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
> e.detailMessage is "java.lang.ClassNotFoundException:
> generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
> flink@127.0.0.1:51428".
>
> I have an old version of BlockInfo which works without error:
>
> public class BlockInfo {
>
>     public long seed;
>
>     public long edges;
> }
>
> The new version of BlockInfo leading to the error:
>
> public class BlockInfo<T extends RandomGenerator> {
>
>     private final RandomGenerable<T> randomGenerable;
>
>     private final int blockIndex;
>
>     private final long firstElement;
>
>     private final long elementCount;
>
>     ...
> }
>
> In this execution the RandomGenerable has only a single field, a long. I am
> puzzled and unsure where to look next.
>
> Greg
>
>
>
> Client log:
>
> 2016-03-15 10:07:25,745 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address
> is
> now gated for [5000] ms. Reason is: [Disassociated].
> 2016-03-15 10:08:25,692 INFO
> org.apache.flink.runtime.client.JobClientActor                - Terminate
> JobClientActor.
>
>
> JobManager log:
>
> 2016-03-15 10:07:25,514 DEBUG
> akka.serialization.Serialization(akka://flink)                - Using
> serializer[akka.serialization.JavaSerializer] for message
> [akka.actor.ActorIdentity]
> 2016-03-15 10:07:25,533 DEBUG
> akka.serialization.Serialization(akka://flink)                - Using
> serializer[akka.serialization.JavaSerializer] for message
> [java.lang.Integer]
> 2016-03-15 10:07:25,547 DEBUG
> org.apache.flink.runtime.blob.BlobServerConnection            - Received
> PUT request for content addressable BLOB
> 2016-03-15 10:07:25,731 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@127.0.0.1:49738] has failed, address
> is now gated for [5000] ms. Reason is: [generator.rmat.random.BlockInfo].
>
>
> Client stack trace:
>
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Communication with JobManager failed: Job submission to
> the JobManager timed out.
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>     at
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>     at Driver.main(Driver.java:462)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>     at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>     at
>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Communication with JobManager failed: Job submission to the JobManager
> timed out.
>     at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:379)
>     ... 16 more
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out.
>     at
>
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
>     at
>
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
>     at
>
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>     at
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>