You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by axt <ax...@load.hu> on 2014/11/03 14:53:37 UTC

Problem with deploying a flink topology in a remote execution environment

Hi! I'm new to this list, and also new to flink, so sorry if my question
is too amateur.

I've created a simple flink streaming topology, which reads its input
from kafka, apply some transformation on them, and sends them to a sink,
which then stores the stream of documents into elasticsearch.

DataStreamSink<ElaIndexCommand> dataStream = env
                .addSource(new ItemsKafkaSource(), 1)
                .flatMap(new TransformItemsStream())
                .addSink(new ElasticSearchSink());

It runs in local environment wonderfully, but I have some troubles
deploying it to a remote execution environment.

For the remote environment, i'm running this version of flink on a yarn
on top of hadoop 2.3:
http://xenia.sote.hu/ftp/mirrors/www.apache.org/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz

"yarn version" gives the following output:

Hadoop 2.3.0-cdh5.1.2
Subversion git://github.sf.cloudera.com/CDH/cdh.git -r
251e630be743d5abaec6ba62cdc5077d229e017f
Compiled by jenkins on 2014-08-26T01:36Z
Compiled with protoc 2.5.0
>From source with checksum ec11b8ec19ca2bf3e7cb1bbe4ee182
This command was run using /usr/lib/hadoop/hadoop-common-2.3.0-cdh5.1.2.jar


I've created a fatjar, but when I'm trying to submit it, I've get the
following error:

Error: The program execution failed: java.lang.Exception: Failed to
deploy the task source-1 (1/1) - execution #0 to slot SubSlot 0
(b6ad89147571b7a3817b0d726f5c584e (0) - ALLOCATED/ALIVE):
java.lang.RuntimeException: Cannot deserialize invokable object
    at
org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at
org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at
org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException:
java.lang.ClassNotFoundException: com.gravityrd.flink.ItemsKafkaSource
    at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at
org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException:
com.gravityrd.flink.ItemsKafkaSource
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)


The classes are in the jar file, when I'm running "./bin/flink run
example-toplogy-flink.jar" with -verbose:class, I see them loaded locally.

After I've tried to submit the example toplogies:
1, I successfully submitted the normal wordcount example.
2, streaming wordcount gives me this error:
java.lang.NoClassDefFoundError:
org/apache/flink/examples/java/wordcount/util/WordCountData
    (but i think its "normal", because that class is missing from the jar)
3, TwitterStream gives this error:
Executing TwitterStream example with built-in default data.
  Provide parameters to read input data from a file.
  USAGE: TwitterStream <pathToPropertiesFile>
Error:
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;
java.lang.NoSuchMethodError:
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;
4, IterateExample gives the following error (similar to my original problem)
Error: The program execution failed: java.lang.Exception: Failed to
deploy the task iterationSource-3 (2/2) - execution #0 to slot SubSlot 0
(eddf0569c5374e0975551230c08f0efe (1) - ALLOCATED/ALIVE):
java.lang.RuntimeException: Cannot deserialize invokable object
    at
org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at
org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at
org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException:
java.lang.ClassNotFoundException:
org.apache.flink.streaming.examples.iteration.IterateExample$Step
    at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at
org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.examples.iteration.IterateExample$Step
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)



Can anybody guide me in the right direction, about what I am doing
wrong, or how should I debug it?

Thanks,

Attila Axt


Re: Problem with deploying a flink topology in a remote execution environment

Posted by Márton Balassi <ba...@gmail.com>.
Dear Attila,

Both the issues you have reported are fixed with the streaming commits just
pushed to the current master. Could you please retry running the jobs?

Best,

Marton

On Mon, Nov 10, 2014 at 4:51 PM, Márton Balassi <ba...@gmail.com>
wrote:

> Hey Attila,
>
> Thanks for the detailed bug report. I've picked up the issue and fixed the
> build for the streaming example packages. [1]
> There is an outstanding issue with the most likely the class loading when
> executing streaming code remotely. I've started working on it, Robert was
> kind enough to give me some guidance on it.
>
> [1]
> https://github.com/mbalassi/incubator-flink/commit/2be50b644807aeead825f512e13b21702404560f
>
> Getting back to you as soon as it is fixed,
>
> Marton
>
> On Wed, Nov 5, 2014 at 5:08 PM, axt <ax...@load.hu> wrote:
>
>>  Hi Marton! Sorry for my late response.
>>
>> My classes are serializable, thats not what causing the error.
>>
>> I've managed to narrow the problem (to the official examples, without
>> using any own code):
>>
>> I can run the wordcount example (with input supplied) and the twitter
>> example, locally.
>> Unfortunatelly  when I try to submit them to the flink-on-yarn instance,
>> in both cases, I get the ClassNotFound errors. Stack traces are here:
>> http://pastebin.com/MMLaNTcv
>>
>> I've tried it with two different hadoop installs.
>>
>> I'm starting flink-on-yarn with this command:
>> HADOOP_HOME=/usr/lib/hadoop
>> /opt/flink-yarn-0.7.0-incubating/bin/yarn-session.sh -n 2 -jm 1024 -tm 4096
>> -s 4
>>
>> Can you give me the versions what you are using in your test
>> environments? Do you have any specific configuration options?
>>
>> Maybe any ideas how should I debug this problem?
>>
>> Just a note: the
>> "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;"
>> errors have disappeared, it was caused just by an inconsistent build.
>>
>> Greetings,
>>   Attila
>>
>>
>>
>> On 2014-11-03 16:57, Márton Balassi wrote:
>>
>>  Hey Attila,
>>
>>  Thanks for trying out streaming!
>>
>>  As for your issues:
>>
>>     * Your simple topology: make sure that your user defined functions
>> (e.g. ItemsKafkaSource)  and the classes containing them are
>> serializable, so that they can be shipped. For anything more specific if
>> you can give send over your code to me I am more than happy to have a look
>> at it.
>>    * Streaming wordcount not working without provided dataset: Yes,
>> thanks for reporting it is my fault - which was known, but now I also have
>> a JIRA [1] for it. :) However if you give it some input it should work.
>>    * Can you run the Twitter example locally by the way?
>>
>>  [1] https://issues.apache.org/jira/browse/FLINK-1204
>>
>>  Cheers,
>>
>>  Marton
>>
>>
>> On Mon, Nov 3, 2014 at 2:53 PM, axt <ax...@load.hu> wrote:
>>
>>> ClassNotFoundException:
>>> com.gravityrd.flink.ItemsKafkaSource
>>>
>>
>>
>>
>>
>

Re: Problem with deploying a flink topology in a remote execution environment

Posted by Márton Balassi <ba...@gmail.com>.
Hey Attila,

Thanks for the detailed bug report. I've picked up the issue and fixed the
build for the streaming example packages. [1]
There is an outstanding issue with the most likely the class loading when
executing streaming code remotely. I've started working on it, Robert was
kind enough to give me some guidance on it.

[1]
https://github.com/mbalassi/incubator-flink/commit/2be50b644807aeead825f512e13b21702404560f

Getting back to you as soon as it is fixed,

Marton

On Wed, Nov 5, 2014 at 5:08 PM, axt <ax...@load.hu> wrote:

>  Hi Marton! Sorry for my late response.
>
> My classes are serializable, thats not what causing the error.
>
> I've managed to narrow the problem (to the official examples, without
> using any own code):
>
> I can run the wordcount example (with input supplied) and the twitter
> example, locally.
> Unfortunatelly  when I try to submit them to the flink-on-yarn instance,
> in both cases, I get the ClassNotFound errors. Stack traces are here:
> http://pastebin.com/MMLaNTcv
>
> I've tried it with two different hadoop installs.
>
> I'm starting flink-on-yarn with this command:
> HADOOP_HOME=/usr/lib/hadoop
> /opt/flink-yarn-0.7.0-incubating/bin/yarn-session.sh -n 2 -jm 1024 -tm 4096
> -s 4
>
> Can you give me the versions what you are using in your test environments?
> Do you have any specific configuration options?
>
> Maybe any ideas how should I debug this problem?
>
> Just a note: the
> "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;"
> errors have disappeared, it was caused just by an inconsistent build.
>
> Greetings,
>   Attila
>
>
>
> On 2014-11-03 16:57, Márton Balassi wrote:
>
>  Hey Attila,
>
>  Thanks for trying out streaming!
>
>  As for your issues:
>
>     * Your simple topology: make sure that your user defined functions
> (e.g. ItemsKafkaSource)  and the classes containing them are
> serializable, so that they can be shipped. For anything more specific if
> you can give send over your code to me I am more than happy to have a look
> at it.
>    * Streaming wordcount not working without provided dataset: Yes, thanks
> for reporting it is my fault - which was known, but now I also have a JIRA
> [1] for it. :) However if you give it some input it should work.
>    * Can you run the Twitter example locally by the way?
>
>  [1] https://issues.apache.org/jira/browse/FLINK-1204
>
>  Cheers,
>
>  Marton
>
>
> On Mon, Nov 3, 2014 at 2:53 PM, axt <ax...@load.hu> wrote:
>
>> ClassNotFoundException:
>> com.gravityrd.flink.ItemsKafkaSource
>>
>
>
>
>

Re: Problem with deploying a flink topology in a remote execution environment

Posted by axt <ax...@load.hu>.
Hi Marton! Sorry for my late response.

My classes are serializable, thats not what causing the error.

I've managed to narrow the problem (to the official examples, without
using any own code):

I can run the wordcount example (with input supplied) and the twitter
example, locally.
Unfortunatelly  when I try to submit them to the flink-on-yarn instance,
in both cases, I get the ClassNotFound errors. Stack traces are here:
http://pastebin.com/MMLaNTcv

I've tried it with two different hadoop installs.

I'm starting flink-on-yarn with this command:
HADOOP_HOME=/usr/lib/hadoop
/opt/flink-yarn-0.7.0-incubating/bin/yarn-session.sh -n 2 -jm 1024 -tm
4096 -s 4

Can you give me the versions what you are using in your test
environments? Do you have any specific configuration options?

Maybe any ideas how should I debug this problem?

Just a note: the
"org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;"
errors have disappeared, it was caused just by an inconsistent build.

Greetings,
  Attila


On 2014-11-03 16:57, Márton Balassi wrote:
> Hey Attila,
>
> Thanks for trying out streaming!
>
> As for your issues:
>
>    * Your simple topology: make sure that your user defined functions
> (e.g. ItemsKafkaSource)  and the classes containing them are
> serializable, so that they can be shipped. For anything more specific
> if you can give send over your code to me I am more than happy to have
> a look at it.
>    * Streaming wordcount not working without provided dataset: Yes,
> thanks for reporting it is my fault - which was known, but now I also
> have a JIRA [1] for it. :) However if you give it some input it should
> work.
>    * Can you run the Twitter example locally by the way?
>
> [1] https://issues.apache.org/jira/browse/FLINK-1204
>
> Cheers,
>
> Marton
>
>
> On Mon, Nov 3, 2014 at 2:53 PM, axt <axt@load.hu <ma...@load.hu>>
> wrote:
>
>     ClassNotFoundException:
>     com.gravityrd.flink.ItemsKafkaSource
>
>
>


Re: Problem with deploying a flink topology in a remote execution environment

Posted by Márton Balassi <ba...@gmail.com>.
Hey Attila,

Thanks for trying out streaming!

As for your issues:

   * Your simple topology: make sure that your user defined functions (e.g.
ItemsKafkaSource)  and the classes containing them are serializable, so
that they can be shipped. For anything more specific if you can give send
over your code to me I am more than happy to have a look at it.
   * Streaming wordcount not working without provided dataset: Yes, thanks
for reporting it is my fault - which was known, but now I also have a JIRA
[1] for it. :) However if you give it some input it should work.
   * Can you run the Twitter example locally by the way?

[1] https://issues.apache.org/jira/browse/FLINK-1204

Cheers,

Marton


On Mon, Nov 3, 2014 at 2:53 PM, axt <ax...@load.hu> wrote:

> ClassNotFoundException:
> com.gravityrd.flink.ItemsKafkaSource
>