You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward <eg...@hotmail.com> on 2018/01/22 13:39:11 UTC

Error with Avro/Kyro after upgrade to Flink 1.4

(resubmission of a previous post, since the stack trace didn't show up last
time)

We're attempting to upgrade our 1.3.2 cluster and jobs to 1.4.0. When 
submitting jobs to the 1.4.0 Kafka cluster, they fail with a Kryo 
registration error. 

My jobs are consuming from Kafka topics with messages in Avro format. The 
avro schemas are registered with a Confluent avro schema registry. For 
ingestion, we've been using the KafkaDeserializerWrapper class from this 
pull request: https://github.com/apache/flink/pull/2705

In the pom.xml, I added a new dependency for flink-avro, and upgraded all 
other maven dependencies to version 1.4.0 

Here's the error: 

java.lang.VerifyError: Bad type on operand stack
  Exception Details:
    Location:
    

org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
@23: invokespecial
  Reason:
    Type
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
(current frame, stack[7]) is not assignable to
'com/esotericsoftware/kryo/Serializer'
  Current Frame:
    bci: @23
    flags: { }
    locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils',
'java/util/LinkedHashMap' }
    stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6,
uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12,
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
}
  Bytecode:
    0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
    0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
    0x0000020: 57b1                                   

	at java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
	at java.lang.Class.getConstructor0(Class.java:3075)
	at java.lang.Class.getConstructor(Class.java:1825)
	at
org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
	at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
	at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:119)
	at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
	at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
	at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
	at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
	at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
	at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
	at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)

Here are the dependencies: 

    <dependencies>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-statsd</artifactId>
            <version>1.4</version>
        </dependency>

       <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.3.1</version>
        </dependency>
        








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Edward <eg...@hotmail.com>.
Thanks, Stephan. You're correct that there was in fact no shading issue in
the official flink-dist_2.11-1.4.0.jar. We are using the jar in the flink
docker image, but I mis-spoke when I said ObjectMapper appeared there
unshaded. It turned out the issue was really a version conflict in our job's
uber-jar file between jackson-core and jackson-databind, which I was able to
resolve.

Regarding the Verify Error, please post back here when you have a patch or a
link to a pull request which we can track.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Stephan Ewen <se...@apache.org>.
I would assume that once you build Flink with correct shading and add all
the required Jackson dependencies to your application jar, all works fine.

On Jan 24, 2018 10:23, "Stephan Ewen" <se...@apache.org> wrote:

> Hi!
>
> I have a patch coming up for Verify Error
>
> Concerning the Jackson Error - did you build Flink yourself? It looks like
> flink-dist was not built properly, the shading is incorrect.
> The class "com.fasterxml.jackson.databind.ObjectMapper" should not be in
> the jar in an unshaded manner.
>
> My first guess is you build Flink yourself with Maven version >= 3.3.0
>
> Please see here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/start/building.html#dependency-shading
>
> Stephan
>
>
> On Tue, Jan 23, 2018 at 5:21 PM, Edward <eg...@hotmail.com> wrote:
>
>> Thanks for the follow-up Stephan.
>>
>> I have been running this job from a built jar file which was submitted to
>> an
>> existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am
>> now
>> getting the same error when any of the following 3 conditions are true:
>> 1. I run the job on a local cluster from within my IDE
>> 2. I run the job on a cluster where "classloader.resolve-order:
>> parent-first"
>> 3. I build the uber jar file without including flink-java,
>> flink-streaming-java and flink-clients (I changed those to "provided" as
>> you
>> suggested, so they aren't in my jar)
>>
>> If any of those 3 cases are true, I get a new NoClassDefFoundError. This
>> error is caused because com.fasterxml.jackson.databind.ObjectMapper is
>> present in flink-dist_2.11-1.4.0.jar, but
>> com.fasterxml.jackson.databind.SerializationConfig is not (only the
>> shaded
>> version:
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.
>> databind.SerializationConfig)
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.fasterxml.jackson.databind.SerializationConfig
>>         at
>> com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:558)
>>         at
>> com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:474)
>>         at
>> com.mycom.datapipeline.common.client.UmsCientFactory.getUser
>> MappingServiceClient(UmsCientFactory.java:31)
>>         at
>> com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.open
>> (UserLookupFunctionBase.java:78)
>>         at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:393)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:254)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> I understand why this is happening in the case of the parent-first
>> classloader, but I can't understand why it's happening when I exclude
>> flink-java from my job's uber jar file -- in that 2nd case, I would expect
>> the job's child classloader to be used, which contains both of those
>> fasterxml classes.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I have a patch coming up for Verify Error

Concerning the Jackson Error - did you build Flink yourself? It looks like
flink-dist was not built properly, the shading is incorrect.
The class "com.fasterxml.jackson.databind.ObjectMapper" should not be in
the jar in an unshaded manner.

My first guess is you build Flink yourself with Maven version >= 3.3.0

Please see here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#dependency-shading

Stephan


On Tue, Jan 23, 2018 at 5:21 PM, Edward <eg...@hotmail.com> wrote:

> Thanks for the follow-up Stephan.
>
> I have been running this job from a built jar file which was submitted to
> an
> existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am
> now
> getting the same error when any of the following 3 conditions are true:
> 1. I run the job on a local cluster from within my IDE
> 2. I run the job on a cluster where "classloader.resolve-order:
> parent-first"
> 3. I build the uber jar file without including flink-java,
> flink-streaming-java and flink-clients (I changed those to "provided" as
> you
> suggested, so they aren't in my jar)
>
> If any of those 3 cases are true, I get a new NoClassDefFoundError. This
> error is caused because com.fasterxml.jackson.databind.ObjectMapper is
> present in flink-dist_2.11-1.4.0.jar, but
> com.fasterxml.jackson.databind.SerializationConfig is not (only the shaded
> version:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.
> SerializationConfig)
>
> java.lang.NoClassDefFoundError: Could not initialize class
> com.fasterxml.jackson.databind.SerializationConfig
>         at
> com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:558)
>         at
> com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:474)
>         at
> com.mycom.datapipeline.common.client.UmsCientFactory.
> getUserMappingServiceClient(UmsCientFactory.java:31)
>         at
> com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.
> open(UserLookupFunctionBase.java:78)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:102)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
>
> I understand why this is happening in the case of the parent-first
> classloader, but I can't understand why it's happening when I exclude
> flink-java from my job's uber jar file -- in that 2nd case, I would expect
> the job's child classloader to be used, which contains both of those
> fasterxml classes.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Edward <eg...@hotmail.com>.
Thanks for the follow-up Stephan.

I have been running this job from a built jar file which was submitted to an
existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am now
getting the same error when any of the following 3 conditions are true:
1. I run the job on a local cluster from within my IDE
2. I run the job on a cluster where "classloader.resolve-order:
parent-first"
3. I build the uber jar file without including flink-java,
flink-streaming-java and flink-clients (I changed those to "provided" as you
suggested, so they aren't in my jar)

If any of those 3 cases are true, I get a new NoClassDefFoundError. This
error is caused because com.fasterxml.jackson.databind.ObjectMapper is
present in flink-dist_2.11-1.4.0.jar, but
com.fasterxml.jackson.databind.SerializationConfig is not (only the shaded
version:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationConfig)

java.lang.NoClassDefFoundError: Could not initialize class
com.fasterxml.jackson.databind.SerializationConfig
	at
com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:558)
	at
com.fasterxml.jackson.databind.ObjectMapper.<init>(ObjectMapper.java:474)
	at
com.mycom.datapipeline.common.client.UmsCientFactory.getUserMappingServiceClient(UmsCientFactory.java:31)
	at
com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.open(UserLookupFunctionBase.java:78)
	at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)

I understand why this is happening in the case of the parent-first
classloader, but I can't understand why it's happening when I exclude
flink-java from my job's uber jar file -- in that 2nd case, I would expect
the job's child classloader to be used, which contains both of those
fasterxml classes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Thanks for posting this. Ugly error. This does indeed look like either a
dependency conflict, or a mix of incompatible class files.

Some suggestions to diagnose this further:

  - Is this the setup of your POM to build the JAR file for your
application, or for running it from within the IDE?

  - If this is the POM for building you application JAR, please set the
"flink-java", "flink-streaming-java", and "flink-clients" dependencies to
the scope "provided".

  - Can you try to set the following entry in your config
"classloader.resolve-order: parent-first"

Please let us know how this affects the program.

Best,
Stephan


On Mon, Jan 22, 2018 at 2:45 PM, Edward <eg...@hotmail.com> wrote:

> Also, I'm not sure if this would cause the uninitialized error, but I did
> notice that in the maven dependency tree there are 2 different versions of
> kyro listed as Flink dependencies:
>  flink-java 1.4 requires kyro 2.24, but flink-streaming-java_2.11 requires
> kyro 2.21:
>
> [INFO] +- org.apache.flink:flink-java:jar:1.4.0:compile
> [INFO] |  +- org.apache.flink:flink-core:jar:1.4.0:compile
> [INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> ....
> [INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.4.0:compile
> [INFO] |  +- (org.apache.flink:flink-core:jar:1.4.0:compile - omitted for
> duplicate)
> [INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.4.0:compile
> [INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.4:compile
> [INFO] |  |  |  +- com.twitter:chill-java:jar:0.7.4:compile
> [INFO] |  |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
> omitted for conflict with 2.24.0)
> [INFO] |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
> omitted for conflict with 2.24.0)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Error with Avro/Kyro after upgrade to Flink 1.4

Posted by Edward <eg...@hotmail.com>.
Also, I'm not sure if this would cause the uninitialized error, but I did
notice that in the maven dependency tree there are 2 different versions of
kyro listed as Flink dependencies:
 flink-java 1.4 requires kyro 2.24, but flink-streaming-java_2.11 requires
kyro 2.21:

[INFO] +- org.apache.flink:flink-java:jar:1.4.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.4.0:compile
[INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
....
[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.4.0:compile
[INFO] |  +- (org.apache.flink:flink-core:jar:1.4.0:compile - omitted for
duplicate)
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.4.0:compile
[INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.4:compile
[INFO] |  |  |  +- com.twitter:chill-java:jar:0.7.4:compile
[INFO] |  |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)
[INFO] |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/