You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by clay4444 <cl...@gmail.com> on 2018/11/20 13:05:38 UTC

About the issue caused by flink's dependency jar package submission method

hi all:

I know that when submitting flink jobs, flink's official recommendation is
to put all the dependencies and business logic into a fat jar, but now our
requirement is to separate the business logic and rely on dynamic commits,
so I found one. One way, use the -yt and -C parameters to submit the task,
execute it in the yarn, so that the task can be submitted, but the following
error is always reported when running.

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
	at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of
org.apache.commons.collections.map.LinkedMap to field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
	at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
	at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
	at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
	... 4 more

is there anyone know about this?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the issue caused by flink's dependency jar package submission method

Posted by clay4444 <cl...@gmail.com>.
hi yinhua,

I consirdered about that way,but I don't think that way is suitable ,
because I want each flink job has its own business logic and dependence jar
,separate from other job,  that's what I want to do,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the issue caused by flink's dependency jar package submission method

Posted by "yinhua.dai" <yi...@outlook.com>.
As far as I know, -yt works for both job manager and task manager, -C works
for flink cli.

Did you consider putting all your jars in /flink/lib?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the issue caused by flink's dependency jar package submission method

Posted by Oscar Westra van Holthe - Kind <os...@westravanholthe.nl>.
On Wed, 21 Nov 2018 at 05:26, clay4444 <cl...@gmail.com> wrote:

> hi
>
> I have checked all the dependences, and don't find the jar with different
> version, so ,I double the way to submit jar has some issue? my commend is
> like this:
>

Did you also check the runtime dependencies where the code is run? Because
the only way that two classes with the same name are not the same is if
they are loaded with two different classloaders.

I suspect your runtime loads the LinkedMap via a different classloader than
FlinkKafkaConsumerBase, and that other classloader (probably a parent) also
has the LinkedMap. Hence the problem.

[...]

Issues like these have a similar origin, but now you have two versions of
kafka on your classpath:

> Caused by: org.apache.kafka.common.KafkaException:

org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of org.apache.kafka.common.serialization.Deserializer
>         at
>
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:673)
>         ... 11 more
>

[...]


>  but in a fat jar, these work all good
>

That's odd.. that means that in that case, your classpath is different. If
you can find out how, solving the exceptions above becomes easy.

Kind regards,
Oscar


-- 
Oscar Westra van Holthe - Kind
Website: http://oscar.westravanholthe.nl/ | Email: oscar@westravanholthe.nl

Re: About the issue caused by flink's dependency jar package submission method

Posted by clay4444 <cl...@gmail.com>.
hi

I have checked all the dependences, and don't find the jar with different
version, so ,I double the way to submit jar has some issue? my commend is
like this:

/data/flink1.6/bin/flink  run -m yarn-cluster -ytm 8032 -yn 1 -ys 1 -yqu
xxxx -yt /data/flink1.6/xxxx/lib -c com.xxx.xxx.xxx.Launch -C
http://xx.xxx.xxx.xxx:80/lib/slf4j-api-1.7.6.jar -C
http://xx.xxx.xxx.xxx:80/lib/lucene-spatial3d-7.3.1.jar -C
http://xx.xxx.xxx.xxx:80/lib/lucene-backward-codecs-7.3.1.jar -C
http://xx.xxx.xxx.xxx:80/lib/log4j-to-slf4j-2.9.1.jar -C
http://xx.xxx.xxx.xxx:80/lib/jackson-dataformat-smile-2.8.10.jar -C
http://xx.xxx.xxx.xxx:80/lib/mysql-connector-java-5.1.39.jar -C
http://xx.xxx.xxx.xxx:80/lib/commons-lang-2.6.jar -C
http://xx.xxx.xxx.xxx:80/lib/apacheds-kerberos-codec-2.0.0-M15.jar -C
http://xx.xxx.xxx.xxx:80/lib/slf4j-log4j12-1.7.5.jar -C
http://xx.xxx.xxx.xxx:80/lib/guava-12.0.1.jar -C
http://xx.xxx.xxx.xxx:80/lib/netty-all-4.0.23.Final.jar -C
http://xx.xxx.xxx.xxx:80/lib/scala-parser-combinators_2.11-1.0.4.jar -C
http://xx.xxx.xxx.xxx:80/lib/lucene-spatial-7.3.1.jar -C
http://xx.xxx.xxx.xxx:80/lib/commons-compress-1.4.1.jar  xxxxx.jar

and I put all the dependence to a nginx dir,

is this way have problem? 
because I also encounter problems like this:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765)
	at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633)
	at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
	at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
	at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
instance of org.apache.kafka.common.serialization.Deserializer
	at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
	at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:673)
	... 11 more

 but in a fat jar, these work all good



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the issue caused by flink's dependency jar package submission method

Posted by Ken Krugler <kk...@transpac.com>.
My only guess would be that you have two versions of the Apache Commons jar on your class path, or the version you compiled against doesn’t match what you’re running against, and that’s why you get:

Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.commons.collections.map.LinkedMap to field 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type 
org.apache.commons.collections.map.LinkedMap

If that’s the case, often the cause is that your Yarn environment has a different version of a jar than what’s in your fat jar.

Though I would expect proper shading would prevent that from happening.

— Ken

> On Nov 20, 2018, at 5:05 AM, clay4444 <clay4megtr@gmail.com <ma...@gmail.com>> wrote:
> 
> hi all:
> 
> I know that when submitting flink jobs, flink's official recommendation is
> to put all the dependencies and business logic into a fat jar, but now our
> requirement is to separate the business logic and rely on dynamic commits,
> so I found one. One way, use the -yt and -C parameters to submit the task,
> execute it in the yarn, so that the task can be submitted, but the following
> error is always reported when running.
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> 	at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
> 	at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.commons.collections.map.LinkedMap to field
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
> of type org.apache.commons.collections.map.LinkedMap in instance of
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
> 	at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> 	at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> 	at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> 	at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
> 	at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
> 	at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
> 	at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
> 	at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
> 	... 4 more
> 
> is there anyone know about this?
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra