You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2020/08/23 11:37:02 UTC

Loading FlinkKafkaProducer fails with LinkError

Hi,
I'm trying to load a FlinkKafkaProducer sink alongside another custom sink.
While trying to restore
a running Flink app from the previous state, I get the error message below.

I am running Flink 1.9.0 with the following SBT dependency added:

"org.apache.flink" %% "flink-connector-kafka" % 1.9.0

And the app is deployed via a standard uber jar with all the dependencies. W
Would appreciate the help

java.lang.LinkageError: loader constraint violation: loader (instance of
org/apache/flink/util/ChildFirstClassLoader) previously initiated loading
for a different type with name
"org/apache/kafka/clients/producer/ProducerRecord"
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at org.apache.flink.util.ChildFirstClassLoader.loadClass(
ChildFirstClassLoader.java:66)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.getDeclaredMethod(Class.java:2128)
    at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:
1629)
    at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
1885)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2287)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:
561)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.readObject(FlinkKafkaProducer.java:1202)
    at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:
1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:235)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:370)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)


-- 
Best Regards,
Yuval Itzchakov.

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Arvid Heise <ar...@ververica.com>.
Hi,

@Chesnay Schepler <ch...@apache.org> The issue is that the uber-jar is
first loaded with Flink's app classloader (because it's in lib) and then
when the application starts, it gets loaded again in the ChildFirstCL and
since it's child-first, the class is loaded anyways.

What I don't quite understand is why the Kafka class was loaded through the
app classloader to begin with. Since Yuval mentioned that it happens only
during restoration, I'm suspecting that Flink is not using the correct
classloader at some point. Unfortunately, I don't know an easy way to trace
the loading for the first loading (we have the stack trace for the second
loading but I think it's legit).

On Tue, Aug 25, 2020 at 11:24 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> They are definitely equal, the same JAR is copied in subsequent lines in
> the Dockerfile.
>
> Regarding the NoSuchMethodException, I'll look it up and let you know
> tomorrow.
>
> On Tue, Aug 25, 2020, 22:59 Chesnay Schepler <ch...@apache.org> wrote:
>
>> The simplest answer is that they are in fact not equal; maybe it is a jar
>> of an older version of your setup?
>>
>> Can you give some details on the NoSuchMethodException? Specifically
>> whether it tries to access something from the Kafka connector, or from your
>> own user code.
>>
>> On 25/08/2020 21:27, Yuval Itzchakov wrote:
>>
>> OK, I think I figured it out. It looks like the uber-jar is also being
>> placed under `lib`, which is probably the cause of the problem.
>>
>> Question is, why does it identify it as two different versions? It's
>> exactly the same JAR.
>>
>> On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> I'm afraid it's not being printed out due to different log levels :(
>>>
>>> Yes, I build the image myself. It takes the tar file from
>>> https://archive.apache.org/dist/flink/flink-1.9.0/
>>> <https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks it
>>> into the image.
>>> I've ran:
>>>
>>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
>>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka
>>>
>>> Both from within /lib, they both produce no results.
>>>
>>> On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>>> The NoSuchMethodException shows that the class is still on the
>>>> classpath, but with a different version than your code is expecting.
>>>> Otherwise you would've gotten a different error.
>>>> This implies that there are 2 versions of the kafka dependencies on the
>>>> classpath in your original run; it suddenly working with parent-first
>>>> classloading reinforces the suspicion that they are present in the
>>>> distribution.
>>>>
>>>> As Arvid mentioned, the classpath log entry (at the very start of the
>>>> log file) would be interesting.
>>>>
>>>> Did you build the Flink yourself distribution, or are you relying on
>>>> one of the existing Flink binaries/images?
>>>>
>>>> On 25/08/2020 20:51, Yuval Itzchakov wrote:
>>>>
>>>> Hi Arvid,
>>>> I'm running Flink in a job cluster on k8s using the Lyft Operator.
>>>>
>>>> The flink image that I'm building does not have the
>>>> flink-connector-kafka library in it's JAR, I've made sure of this using
>>>> `jar -tf`. Additionally, once I removed the dependency from my uber jar, it
>>>> failed with a "NoSuchMethodException" at runtime for one of the arbitrary
>>>> methods.
>>>>
>>>> I used classloader.resolve-order: parent-first and it resolved the
>>>> issue somehow. I still don't know why though.
>>>>
>>>> On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Yuval,
>>>>>
>>>>> How do you execute Flink? Can you show us the log entry with the
>>>>> classpath?
>>>>>
>>>>> I'm guessing that you have Kafka bundled in your uber-jar and
>>>>> additionally also have the connector in flink-dist/lib. If so, you simply
>>>>> need to remove it in one place. In general, if you use flink-dist, you'd
>>>>> not bundle any Flink dependencies in your uber-jar (use provided scope for
>>>>> them).
>>>>>
>>>>> If you have everything bundled in one uber-jar and execute it somehow
>>>>> without flink-dist, then I don't immediately see a solution. Then the log
>>>>> with the classpath would help.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>>>>>> sink. While trying to restore
>>>>>> a running Flink app from the previous state, I get the error message
>>>>>> below.
>>>>>> I am running Flink 1.9.0 with the following SBT dependency added:
>>>>>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>>>>> And the app is deployed via a standard uber jar with all the
>>>>>> dependencies. W
>>>>>> Would appreciate the help
>>>>>> java.lang.LinkageError: loader constraint violation: loader
>>>>>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>>>>>> initiated loading for a different type with name
>>>>>> "org/apache/kafka/clients/producer/ProducerRecord"
>>>>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>>>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>>>>> .java:142)
>>>>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>>>>> ChildFirstClassLoader.java:66)
>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>>>>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>>>>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>>>>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass
>>>>>> .java:1629)
>>>>>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:
>>>>>> 79)
>>>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>>>>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>>>>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:
>>>>>> 681)
>>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>>>>> .java:1885)
>>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream
>>>>>> .java:1751)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>>> .java:2042)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>>> 1573)
>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>>> .java:2287)
>>>>>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>>>>>> .java:561)
>>>>>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>> .readObject(FlinkKafkaProducer.java:1202)
>>>>>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>>>>> .java:1170)
>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>>> .java:2178)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>>> .java:2069)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>>> 1573)
>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>>> .java:2287)
>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>>> .java:2211)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>>> .java:2069)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>>> 1573)
>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>>> .java:2287)
>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>>> .java:2211)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>>> .java:2069)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>>> 1573)
>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:
>>>>>> 431)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>>> InstantiationUtil.java:576)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>>> InstantiationUtil.java:562)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>>> InstantiationUtil.java:550)
>>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>>>>> InstantiationUtil.java:511)
>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig
>>>>>> .getStreamOperatorFactory(StreamConfig.java:235)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:427)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>>>>>> OperatorChain.java:144)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>> StreamTask.java:370)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Yuval Itzchakov.
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
>>>>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Yuval Itzchakov <yu...@gmail.com>.
They are definitely equal, the same JAR is copied in subsequent lines in
the Dockerfile.

Regarding the NoSuchMethodException, I'll look it up and let you know
tomorrow.

On Tue, Aug 25, 2020, 22:59 Chesnay Schepler <ch...@apache.org> wrote:

> The simplest answer is that they are in fact not equal; maybe it is a jar
> of an older version of your setup?
>
> Can you give some details on the NoSuchMethodException? Specifically
> whether it tries to access something from the Kafka connector, or from your
> own user code.
>
> On 25/08/2020 21:27, Yuval Itzchakov wrote:
>
> OK, I think I figured it out. It looks like the uber-jar is also being
> placed under `lib`, which is probably the cause of the problem.
>
> Question is, why does it identify it as two different versions? It's
> exactly the same JAR.
>
> On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <yu...@gmail.com>
> wrote:
>
>> I'm afraid it's not being printed out due to different log levels :(
>>
>> Yes, I build the image myself. It takes the tar file from
>> https://archive.apache.org/dist/flink/flink-1.9.0/
>> <https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks it into
>> the image.
>> I've ran:
>>
>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka
>>
>> Both from within /lib, they both produce no results.
>>
>> On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> The NoSuchMethodException shows that the class is still on the
>>> classpath, but with a different version than your code is expecting.
>>> Otherwise you would've gotten a different error.
>>> This implies that there are 2 versions of the kafka dependencies on the
>>> classpath in your original run; it suddenly working with parent-first
>>> classloading reinforces the suspicion that they are present in the
>>> distribution.
>>>
>>> As Arvid mentioned, the classpath log entry (at the very start of the
>>> log file) would be interesting.
>>>
>>> Did you build the Flink yourself distribution, or are you relying on one
>>> of the existing Flink binaries/images?
>>>
>>> On 25/08/2020 20:51, Yuval Itzchakov wrote:
>>>
>>> Hi Arvid,
>>> I'm running Flink in a job cluster on k8s using the Lyft Operator.
>>>
>>> The flink image that I'm building does not have the
>>> flink-connector-kafka library in it's JAR, I've made sure of this using
>>> `jar -tf`. Additionally, once I removed the dependency from my uber jar, it
>>> failed with a "NoSuchMethodException" at runtime for one of the arbitrary
>>> methods.
>>>
>>> I used classloader.resolve-order: parent-first and it resolved the issue
>>> somehow. I still don't know why though.
>>>
>>> On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Yuval,
>>>>
>>>> How do you execute Flink? Can you show us the log entry with the
>>>> classpath?
>>>>
>>>> I'm guessing that you have Kafka bundled in your uber-jar and
>>>> additionally also have the connector in flink-dist/lib. If so, you simply
>>>> need to remove it in one place. In general, if you use flink-dist, you'd
>>>> not bundle any Flink dependencies in your uber-jar (use provided scope for
>>>> them).
>>>>
>>>> If you have everything bundled in one uber-jar and execute it somehow
>>>> without flink-dist, then I don't immediately see a solution. Then the log
>>>> with the classpath would help.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>>
>>>> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>>>>> sink. While trying to restore
>>>>> a running Flink app from the previous state, I get the error message
>>>>> below.
>>>>> I am running Flink 1.9.0 with the following SBT dependency added:
>>>>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>>>> And the app is deployed via a standard uber jar with all the
>>>>> dependencies. W
>>>>> Would appreciate the help
>>>>> java.lang.LinkageError: loader constraint violation: loader (instance
>>>>> of org/apache/flink/util/ChildFirstClassLoader) previously initiated
>>>>> loading for a different type with name
>>>>> "org/apache/kafka/clients/producer/ProducerRecord"
>>>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>>>> .java:142)
>>>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>>>> ChildFirstClassLoader.java:66)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>>>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>>>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>>>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass
>>>>> .java:1629)
>>>>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79
>>>>> )
>>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>>>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>>>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:
>>>>> 681)
>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>>>> .java:1885)
>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>>>>> 1751)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>> .java:2042)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>> 1573)
>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>> .java:2287)
>>>>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>>>>> .java:561)
>>>>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>> .readObject(FlinkKafkaProducer.java:1202)
>>>>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>>>> .java:1170)
>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>> .java:2178)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>> .java:2069)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>> 1573)
>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>> .java:2287)
>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>> .java:2211)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>> .java:2069)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>> 1573)
>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>>> .java:2287)
>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream
>>>>> .java:2211)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>>> .java:2069)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>>> 1573)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431
>>>>> )
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:576)
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:562)
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:550)
>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>>>> InstantiationUtil.java:511)
>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig
>>>>> .getStreamOperatorFactory(StreamConfig.java:235)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:427)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createChainedOperator(OperatorChain.java:418)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .createOutputCollector(OperatorChain.java:354)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>>>>> OperatorChain.java:144)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:370)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
>>>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>>
>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Chesnay Schepler <ch...@apache.org>.
The simplest answer is that they are in fact not equal; maybe it is a 
jar of an older version of your setup?

Can you give some details on the NoSuchMethodException? Specifically 
whether it tries to access something from the Kafka connector, or from 
your own user code.

On 25/08/2020 21:27, Yuval Itzchakov wrote:
> OK, I think I figured it out. It looks like the uber-jar is also being 
> placed under `lib`, which is probably the cause of the problem.
>
> Question is, why does it identify it as two different versions? It's 
> exactly the same JAR.
>
> On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <yuvalos@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     I'm afraid it's not being printed out due to different log levels :(
>
>     Yes, I build the image myself. It takes the tar file from
>     https://archive.apache.org/dist/flink/flink-1.9.0/
>     <https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks
>     it into the image.
>     I've ran:
>
>     find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
>     find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka
>
>     Both from within /lib, they both produce no results.
>
>     On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler
>     <chesnay@apache.org <ma...@apache.org>> wrote:
>
>         The NoSuchMethodException shows that the class is still on the
>         classpath, but with a different version than your code is
>         expecting. Otherwise you would've gotten a different error.
>         This implies that there are 2 versions of the kafka
>         dependencies on the classpath in your original run; it
>         suddenly working with parent-first classloading reinforces the
>         suspicion that they are present in the distribution.
>
>         As Arvid mentioned, the classpath log entry (at the very start
>         of the log file) would be interesting.
>
>         Did you build the Flink yourself distribution, or are you
>         relying on one of the existing Flink binaries/images?
>
>         On 25/08/2020 20:51, Yuval Itzchakov wrote:
>>         Hi Arvid,
>>         I'm running Flink in a job cluster on k8s using the Lyft
>>         Operator.
>>
>>         The flink image that I'm building does not have the
>>         flink-connector-kafka library in it's JAR, I've made sure of
>>         this using `jar -tf`. Additionally, once I removed the
>>         dependency from my uber jar, it failed with a
>>         "NoSuchMethodException" at runtime for one of the arbitrary
>>         methods.
>>
>>         I used classloader.resolve-order: parent-first and it
>>         resolved the issue somehow. I still don't know why though.
>>
>>         On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise
>>         <arvid@ververica.com <ma...@ververica.com>> wrote:
>>
>>             Hi Yuval,
>>
>>             How do you execute Flink? Can you show us the log entry
>>             with the classpath?
>>
>>             I'm guessing that you have Kafka bundled in your uber-jar
>>             and additionally also have the connector in
>>             flink-dist/lib. If so, you simply need to remove it in
>>             one place. In general, if you use flink-dist, you'd not
>>             bundle any Flink dependencies in your uber-jar (use
>>             provided scope for them).
>>
>>             If you have everything bundled in one uber-jar and
>>             execute it somehow without flink-dist, then I don't
>>             immediately see a solution. Then the log with the
>>             classpath would help.
>>
>>             Best,
>>
>>             Arvid
>>
>>
>>             On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov
>>             <yuvalos@gmail.com <ma...@gmail.com>> wrote:
>>
>>                 Hi,
>>                 I'm trying to load a FlinkKafkaProducer sink
>>                 alongside another custom sink. While trying to restore
>>                 a running Flink app from the previous state, I get
>>                 the error message below.
>>                 I am running Flink 1.9.0 with the following SBT
>>                 dependency added:
>>                 "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>                 And the app is deployed via a standard uber jar with
>>                 all the dependencies. W
>>                 Would appreciate the help
>>                 java.lang.LinkageError: loader constraint violation:
>>                 loader (instance of
>>                 org/apache/flink/util/ChildFirstClassLoader)
>>                 previously initiated loading for a different type
>>                 with name
>>                 "org/apache/kafka/clients/producer/ProducerRecord"
>>                     at java.lang.ClassLoader.defineClass1(Native Method)
>>                     at
>>                 java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>                     at
>>                 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>                     at java.net
>>                 <http://java.net>.URLClassLoader.defineClass(URLClassLoader.java:468)
>>                     at java.net
>>                 <http://java.net>.URLClassLoader.access$100(URLClassLoader.java:74)
>>                     at java.net
>>                 <http://java.net>.URLClassLoader$1.run(URLClassLoader.java:369)
>>                     at java.net
>>                 <http://java.net>.URLClassLoader$1.run(URLClassLoader.java:363)
>>                     at
>>                 java.security.AccessController.doPrivileged(Native
>>                 Method)
>>                     at java.net
>>                 <http://java.net>.URLClassLoader.findClass(URLClassLoader.java:362)
>>                     at
>>                 org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>>                     at
>>                 java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>                     at java.lang.Class.getDeclaredMethods0(Native Method)
>>                     at
>>                 java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>                     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>                     at
>>                 java.security.AccessController.doPrivileged(Native
>>                 Method)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
>>                     at
>>                 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1202)
>>                     at
>>                 sun.reflect.GeneratedMethodAccessor358.invoke(Unknown
>>                 Source)
>>                     at
>>                 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>                     at java.lang.reflect.Method.invoke(Method.java:498)
>>                     at java.io
>>                 <http://java.io>.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>                     at java.io
>>                 <http://java.io>.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>                     at
>>                 org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>                     at
>>                 org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>                     at
>>                 org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>                     at
>>                 org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>                     at
>>                 org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
>>                     at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
>>                     at
>>                 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>                     at
>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>                     at java.lang.Thread.run(Thread.java:748)
>>
>>                 -- 
>>                 Best Regards,
>>                 Yuval Itzchakov.
>>
>>
>>
>>             -- 
>>
>>             Arvid Heise| Senior Java Developer
>>
>>             <https://www.ververica.com/>
>>
>>
>>             Follow us @VervericaData
>>
>>             --
>>
>>             Join Flink Forward <https://flink-forward.org/>- The
>>             Apache FlinkConference
>>
>>             Stream Processing | Event Driven | Real Time
>>
>>             --
>>
>>             Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>>             --
>>
>>             Ververica GmbHRegistered at Amtsgericht Charlottenburg:
>>             HRB 158244 BManaging Directors: Timothy Alexander
>>             Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>
>>
>>
>>         -- 
>>         Best Regards,
>>         Yuval Itzchakov.
>
>
>
>
>     -- 
>     Best Regards,
>     Yuval Itzchakov.
>
>
>
> -- 
> Best Regards,
> Yuval Itzchakov.



Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Yuval Itzchakov <yu...@gmail.com>.
OK, I think I figured it out. It looks like the uber-jar is also being
placed under `lib`, which is probably the cause of the problem.

Question is, why does it identify it as two different versions? It's
exactly the same JAR.

On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> I'm afraid it's not being printed out due to different log levels :(
>
> Yes, I build the image myself. It takes the tar file from
> https://archive.apache.org/dist/flink/flink-1.9.0/
> <https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks it into
> the image.
> I've ran:
>
> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka
>
> Both from within /lib, they both produce no results.
>
> On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> The NoSuchMethodException shows that the class is still on the classpath,
>> but with a different version than your code is expecting. Otherwise you
>> would've gotten a different error.
>> This implies that there are 2 versions of the kafka dependencies on the
>> classpath in your original run; it suddenly working with parent-first
>> classloading reinforces the suspicion that they are present in the
>> distribution.
>>
>> As Arvid mentioned, the classpath log entry (at the very start of the log
>> file) would be interesting.
>>
>> Did you build the Flink yourself distribution, or are you relying on one
>> of the existing Flink binaries/images?
>>
>> On 25/08/2020 20:51, Yuval Itzchakov wrote:
>>
>> Hi Arvid,
>> I'm running Flink in a job cluster on k8s using the Lyft Operator.
>>
>> The flink image that I'm building does not have the flink-connector-kafka
>> library in it's JAR, I've made sure of this using `jar -tf`. Additionally,
>> once I removed the dependency from my uber jar, it failed with a
>> "NoSuchMethodException" at runtime for one of the arbitrary methods.
>>
>> I used classloader.resolve-order: parent-first and it resolved the issue
>> somehow. I still don't know why though.
>>
>> On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Yuval,
>>>
>>> How do you execute Flink? Can you show us the log entry with the
>>> classpath?
>>>
>>> I'm guessing that you have Kafka bundled in your uber-jar and
>>> additionally also have the connector in flink-dist/lib. If so, you simply
>>> need to remove it in one place. In general, if you use flink-dist, you'd
>>> not bundle any Flink dependencies in your uber-jar (use provided scope for
>>> them).
>>>
>>> If you have everything bundled in one uber-jar and execute it somehow
>>> without flink-dist, then I don't immediately see a solution. Then the log
>>> with the classpath would help.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>>>> sink. While trying to restore
>>>> a running Flink app from the previous state, I get the error message
>>>> below.
>>>> I am running Flink 1.9.0 with the following SBT dependency added:
>>>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>>> And the app is deployed via a standard uber jar with all the
>>>> dependencies. W
>>>> Would appreciate the help
>>>> java.lang.LinkageError: loader constraint violation: loader (instance
>>>> of org/apache/flink/util/ChildFirstClassLoader) previously initiated
>>>> loading for a different type with name
>>>> "org/apache/kafka/clients/producer/ProducerRecord"
>>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>>> .java:142)
>>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>>> ChildFirstClassLoader.java:66)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass
>>>> .java:1629)
>>>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:
>>>> 681)
>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>>> .java:1885)
>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>>>> 1751)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>> .java:2042)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>> 1573)
>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>> .java:2287)
>>>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>>>> .java:561)
>>>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>> .readObject(FlinkKafkaProducer.java:1202)
>>>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>> DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>>> .java:1170)
>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>>> 2178)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>> .java:2069)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>> 1573)
>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>> .java:2287)
>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>>> 2211)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>> .java:2069)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>> 1573)
>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>>> .java:2287)
>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>>> 2211)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>>> .java:2069)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
>>>> 1573)
>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>> InstantiationUtil.java:576)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>> InstantiationUtil.java:562)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>> InstantiationUtil.java:550)
>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>>> InstantiationUtil.java:511)
>>>>     at org.apache.flink.streaming.api.graph.StreamConfig
>>>> .getStreamOperatorFactory(StreamConfig.java:235)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:427)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createChainedOperator(OperatorChain.java:418)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .createOutputCollector(OperatorChain.java:354)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>>>> OperatorChain.java:144)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:370)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
>>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Yuval Itzchakov <yu...@gmail.com>.
I'm afraid it's not being printed out due to different log levels :(

Yes, I build the image myself. It takes the tar file from
https://archive.apache.org/dist/flink/flink-1.9.0/
<https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks it into
the image.
I've ran:

find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka

Both from within /lib, they both produce no results.

On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler <ch...@apache.org>
wrote:

> The NoSuchMethodException shows that the class is still on the classpath,
> but with a different version than your code is expecting. Otherwise you
> would've gotten a different error.
> This implies that there are 2 versions of the kafka dependencies on the
> classpath in your original run; it suddenly working with parent-first
> classloading reinforces the suspicion that they are present in the
> distribution.
>
> As Arvid mentioned, the classpath log entry (at the very start of the log
> file) would be interesting.
>
> Did you build the Flink yourself distribution, or are you relying on one
> of the existing Flink binaries/images?
>
> On 25/08/2020 20:51, Yuval Itzchakov wrote:
>
> Hi Arvid,
> I'm running Flink in a job cluster on k8s using the Lyft Operator.
>
> The flink image that I'm building does not have the flink-connector-kafka
> library in it's JAR, I've made sure of this using `jar -tf`. Additionally,
> once I removed the dependency from my uber jar, it failed with a
> "NoSuchMethodException" at runtime for one of the arbitrary methods.
>
> I used classloader.resolve-order: parent-first and it resolved the issue
> somehow. I still don't know why though.
>
> On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Yuval,
>>
>> How do you execute Flink? Can you show us the log entry with the
>> classpath?
>>
>> I'm guessing that you have Kafka bundled in your uber-jar and
>> additionally also have the connector in flink-dist/lib. If so, you simply
>> need to remove it in one place. In general, if you use flink-dist, you'd
>> not bundle any Flink dependencies in your uber-jar (use provided scope for
>> them).
>>
>> If you have everything bundled in one uber-jar and execute it somehow
>> without flink-dist, then I don't immediately see a solution. Then the log
>> with the classpath would help.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>>> sink. While trying to restore
>>> a running Flink app from the previous state, I get the error message
>>> below.
>>> I am running Flink 1.9.0 with the following SBT dependency added:
>>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>> And the app is deployed via a standard uber jar with all the
>>> dependencies. W
>>> Would appreciate the help
>>> java.lang.LinkageError: loader constraint violation: loader (instance of
>>> org/apache/flink/util/ChildFirstClassLoader) previously initiated
>>> loading for a different type with name
>>> "org/apache/kafka/clients/producer/ProducerRecord"
>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>> .java:142)
>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>> ChildFirstClassLoader.java:66)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass
>>> .java:1629)
>>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681
>>> )
>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>> .java:1885)
>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>>> 1751)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2042)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>>> .java:561)
>>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>> .readObject(FlinkKafkaProducer.java:1202)
>>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>> .java:1170)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2178)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2211)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2211)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:576)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:562)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:550)
>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>> InstantiationUtil.java:511)
>>>     at org.apache.flink.streaming.api.graph.StreamConfig
>>> .getStreamOperatorFactory(StreamConfig.java:235)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:427)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>>> OperatorChain.java:144)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:370)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Chesnay Schepler <ch...@apache.org>.
The NoSuchMethodException shows that the class is still on the 
classpath, but with a different version than your code is expecting. 
Otherwise you would've gotten a different error.
This implies that there are 2 versions of the kafka dependencies on the 
classpath in your original run; it suddenly working with parent-first 
classloading reinforces the suspicion that they are present in the 
distribution.

As Arvid mentioned, the classpath log entry (at the very start of the 
log file) would be interesting.

Did you build the Flink yourself distribution, or are you relying on one 
of the existing Flink binaries/images?

On 25/08/2020 20:51, Yuval Itzchakov wrote:
> Hi Arvid,
> I'm running Flink in a job cluster on k8s using the Lyft Operator.
>
> The flink image that I'm building does not have the 
> flink-connector-kafka library in it's JAR, I've made sure of this 
> using `jar -tf`. Additionally, once I removed the dependency from my 
> uber jar, it failed with a "NoSuchMethodException" at runtime for one 
> of the arbitrary methods.
>
> I used classloader.resolve-order: parent-first and it resolved the 
> issue somehow. I still don't know why though.
>
> On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <arvid@ververica.com 
> <ma...@ververica.com>> wrote:
>
>     Hi Yuval,
>
>     How do you execute Flink? Can you show us the log entry with the
>     classpath?
>
>     I'm guessing that you have Kafka bundled in your uber-jar and
>     additionally also have the connector in flink-dist/lib. If so, you
>     simply need to remove it in one place. In general, if you use
>     flink-dist, you'd not bundle any Flink dependencies in your
>     uber-jar (use provided scope for them).
>
>     If you have everything bundled in one uber-jar and execute it
>     somehow without flink-dist, then I don't immediately see a
>     solution. Then the log with the classpath would help.
>
>     Best,
>
>     Arvid
>
>
>     On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yuvalos@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Hi,
>         I'm trying to load a FlinkKafkaProducer sink alongside another
>         custom sink. While trying to restore
>         a running Flink app from the previous state, I get the error
>         message below.
>         I am running Flink 1.9.0 with the following SBT dependency added:
>         "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>         And the app is deployed via a standard uber jar with all the
>         dependencies. W
>         Would appreciate the help
>         java.lang.LinkageError: loader constraint violation: loader
>         (instance of org/apache/flink/util/ChildFirstClassLoader)
>         previously initiated loading for a different type with name
>         "org/apache/kafka/clients/producer/ProducerRecord"
>             at java.lang.ClassLoader.defineClass1(Native Method)
>             at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>             at
>         java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>             at java.net
>         <http://java.net>.URLClassLoader.defineClass(URLClassLoader.java:468)
>             at java.net
>         <http://java.net>.URLClassLoader.access$100(URLClassLoader.java:74)
>             at java.net
>         <http://java.net>.URLClassLoader$1.run(URLClassLoader.java:369)
>             at java.net
>         <http://java.net>.URLClassLoader$1.run(URLClassLoader.java:363)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at java.net
>         <http://java.net>.URLClassLoader.findClass(URLClassLoader.java:362)
>             at
>         org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>             at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>             at java.lang.Class.getDeclaredMethods0(Native Method)
>             at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>             at java.lang.Class.getDeclaredMethod(Class.java:2128)
>             at java.io
>         <http://java.io>.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
>             at java.io
>         <http://java.io>.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>             at java.io
>         <http://java.io>.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>             at java.io
>         <http://java.io>.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at java.io
>         <http://java.io>.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>             at java.io
>         <http://java.io>.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>             at java.io
>         <http://java.io>.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>             at java.io
>         <http://java.io>.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>             at java.io
>         <http://java.io>.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>             at java.io
>         <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>             at java.io
>         <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>             at java.io
>         <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>             at java.io
>         <http://java.io>.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
>             at
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1202)
>             at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown
>         Source)
>             at
>         sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>             at java.lang.reflect.Method.invoke(Method.java:498)
>             at java.io
>         <http://java.io>.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
>             at java.io
>         <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
>             at java.io
>         <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>             at java.io
>         <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>             at java.io
>         <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>             at java.io
>         <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>             at java.io
>         <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>             at java.io
>         <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>             at java.io
>         <http://java.io>.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>             at java.io
>         <http://java.io>.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>             at java.io
>         <http://java.io>.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>             at java.io
>         <http://java.io>.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>             at java.io
>         <http://java.io>.ObjectInputStream.readObject(ObjectInputStream.java:431)
>             at
>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>             at
>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>             at
>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>             at
>         org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>             at
>         org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
>             at
>         org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
>             at
>         org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
>             at
>         org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>             at
>         org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>             at java.lang.Thread.run(Thread.java:748)
>
>         -- 
>         Best Regards,
>         Yuval Itzchakov.
>
>
>
>     -- 
>
>     Arvid Heise| Senior Java Developer
>
>     <https://www.ververica.com/>
>
>
>     Follow us @VervericaData
>
>     --
>
>     Join Flink Forward <https://flink-forward.org/>- The Apache
>     FlinkConference
>
>     Stream Processing | Event Driven | Real Time
>
>     --
>
>     Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
>     --
>
>     Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
>     BManaging Directors: Timothy Alexander Steinert, Yip Park Tung
>     Jason, Ji (Toni) Cheng
>
>
>
> -- 
> Best Regards,
> Yuval Itzchakov.



Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Arvid,
I'm running Flink in a job cluster on k8s using the Lyft Operator.

The flink image that I'm building does not have the flink-connector-kafka
library in it's JAR, I've made sure of this using `jar -tf`. Additionally,
once I removed the dependency from my uber jar, it failed with a
"NoSuchMethodException" at runtime for one of the arbitrary methods.

I used classloader.resolve-order: parent-first and it resolved the issue
somehow. I still don't know why though.

On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Yuval,
>
> How do you execute Flink? Can you show us the log entry with the classpath?
>
> I'm guessing that you have Kafka bundled in your uber-jar and additionally
> also have the connector in flink-dist/lib. If so, you simply need to remove
> it in one place. In general, if you use flink-dist, you'd not bundle any
> Flink dependencies in your uber-jar (use provided scope for them).
>
> If you have everything bundled in one uber-jar and execute it somehow
> without flink-dist, then I don't immediately see a solution. Then the log
> with the classpath would help.
>
> Best,
>
> Arvid
>
>
> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Hi,
>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>> sink. While trying to restore
>> a running Flink app from the previous state, I get the error message
>> below.
>>
>> I am running Flink 1.9.0 with the following SBT dependency added:
>>
>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>
>> And the app is deployed via a standard uber jar with all the
>> dependencies. W
>> Would appreciate the help
>>
>> java.lang.LinkageError: loader constraint violation: loader (instance of
>> org/apache/flink/util/ChildFirstClassLoader) previously initiated
>> loading for a different type with name
>> "org/apache/kafka/clients/producer/ProducerRecord"
>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>> .java:142)
>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>> ChildFirstClassLoader.java:66)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:
>> 1629)
>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
>> 1885)
>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>> 1751)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2042)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>> .java:561)
>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>> .readObject(FlinkKafkaProducer.java:1202)
>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:
>> 1170)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2178)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2211)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2211)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:576)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:562)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:550)
>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>> InstantiationUtil.java:511)
>>     at org.apache.flink.streaming.api.graph.StreamConfig
>> .getStreamOperatorFactory(StreamConfig.java:235)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:427)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>> OperatorChain.java:144)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:370)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best Regards,
Yuval Itzchakov.

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Yuval Itzchakov <yu...@gmail.com>.
Will it be enough to provide you the output of `-verbose:class`? Or do you
want me to add additional arguments?

On Tue, Aug 25, 2020 at 6:20 PM Arvid Heise <ar...@ververica.com> wrote:

> Small correction: you'd bundle the connectors in your uber jar like you
> did but you usually don't put it into flink-dist.
>
> So please double-check if it's also in flink-dist and remove it there. If
> not, then please add the full classpath log statement.
>
> It might also be a bug related to restoring and the way Flink loads the
> classes then, but I'd first eliminate the obvious.
>
> On Tue, Aug 25, 2020 at 5:12 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Yuval,
>>
>> How do you execute Flink? Can you show us the log entry with the
>> classpath?
>>
>> I'm guessing that you have Kafka bundled in your uber-jar and
>> additionally also have the connector in flink-dist/lib. If so, you simply
>> need to remove it in one place. In general, if you use flink-dist, you'd
>> not bundle any Flink dependencies in your uber-jar (use provided scope for
>> them).
>>
>> If you have everything bundled in one uber-jar and execute it somehow
>> without flink-dist, then I don't immediately see a solution. Then the log
>> with the classpath would help.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>>> sink. While trying to restore
>>> a running Flink app from the previous state, I get the error message
>>> below.
>>>
>>> I am running Flink 1.9.0 with the following SBT dependency added:
>>>
>>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>>
>>> And the app is deployed via a standard uber jar with all the
>>> dependencies. W
>>> Would appreciate the help
>>>
>>> java.lang.LinkageError: loader constraint violation: loader (instance of
>>> org/apache/flink/util/ChildFirstClassLoader) previously initiated
>>> loading for a different type with name
>>> "org/apache/kafka/clients/producer/ProducerRecord"
>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>>> .java:142)
>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>>> ChildFirstClassLoader.java:66)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass
>>> .java:1629)
>>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681
>>> )
>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>> .java:1885)
>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>>> 1751)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2042)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>>> .java:561)
>>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>> .readObject(FlinkKafkaProducer.java:1202)
>>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>> .java:1170)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2178)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2211)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2287)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2211)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2069)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573
>>> )
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:576)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:562)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:550)
>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>> InstantiationUtil.java:511)
>>>     at org.apache.flink.streaming.api.graph.StreamConfig
>>> .getStreamOperatorFactory(StreamConfig.java:235)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:427)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:418)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:354)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>>> OperatorChain.java:144)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:370)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best Regards,
Yuval Itzchakov.

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Arvid Heise <ar...@ververica.com>.
Small correction: you'd bundle the connectors in your uber jar like you did
but you usually don't put it into flink-dist.

So please double-check if it's also in flink-dist and remove it there. If
not, then please add the full classpath log statement.

It might also be a bug related to restoring and the way Flink loads the
classes then, but I'd first eliminate the obvious.

On Tue, Aug 25, 2020 at 5:12 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Yuval,
>
> How do you execute Flink? Can you show us the log entry with the classpath?
>
> I'm guessing that you have Kafka bundled in your uber-jar and additionally
> also have the connector in flink-dist/lib. If so, you simply need to remove
> it in one place. In general, if you use flink-dist, you'd not bundle any
> Flink dependencies in your uber-jar (use provided scope for them).
>
> If you have everything bundled in one uber-jar and execute it somehow
> without flink-dist, then I don't immediately see a solution. Then the log
> with the classpath would help.
>
> Best,
>
> Arvid
>
>
> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Hi,
>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>> sink. While trying to restore
>> a running Flink app from the previous state, I get the error message
>> below.
>>
>> I am running Flink 1.9.0 with the following SBT dependency added:
>>
>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>>
>> And the app is deployed via a standard uber jar with all the
>> dependencies. W
>> Would appreciate the help
>>
>> java.lang.LinkageError: loader constraint violation: loader (instance of
>> org/apache/flink/util/ChildFirstClassLoader) previously initiated
>> loading for a different type with name
>> "org/apache/kafka/clients/producer/ProducerRecord"
>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>     at java.security.SecureClassLoader.defineClass(SecureClassLoader
>> .java:142)
>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
>> ChildFirstClassLoader.java:66)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     at java.lang.Class.getDeclaredMethods0(Native Method)
>>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:
>> 1629)
>>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
>> 1885)
>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>> 1751)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2042)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream
>> .java:561)
>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>> .readObject(FlinkKafkaProducer.java:1202)
>>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:
>> 1170)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2178)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2211)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2287)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2211)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2069)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:576)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:562)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:550)
>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>> InstantiationUtil.java:511)
>>     at org.apache.flink.streaming.api.graph.StreamConfig
>> .getStreamOperatorFactory(StreamConfig.java:235)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:427)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:418)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:354)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
>> OperatorChain.java:144)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:370)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Loading FlinkKafkaProducer fails with LinkError

Posted by Arvid Heise <ar...@ververica.com>.
Hi Yuval,

How do you execute Flink? Can you show us the log entry with the classpath?

I'm guessing that you have Kafka bundled in your uber-jar and additionally
also have the connector in flink-dist/lib. If so, you simply need to remove
it in one place. In general, if you use flink-dist, you'd not bundle any
Flink dependencies in your uber-jar (use provided scope for them).

If you have everything bundled in one uber-jar and execute it somehow
without flink-dist, then I don't immediately see a solution. Then the log
with the classpath would help.

Best,

Arvid


On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi,
> I'm trying to load a FlinkKafkaProducer sink alongside another custom
> sink. While trying to restore
> a running Flink app from the previous state, I get the error message below.
>
> I am running Flink 1.9.0 with the following SBT dependency added:
>
> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>
> And the app is deployed via a standard uber jar with all the dependencies.
> W
> Would appreciate the help
>
> java.lang.LinkageError: loader constraint violation: loader (instance of
> org/apache/flink/util/ChildFirstClassLoader) previously initiated loading
> for a different type with name
> "org/apache/kafka/clients/producer/ProducerRecord"
>     at java.lang.ClassLoader.defineClass1(Native Method)
>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
> 142)
>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>     at org.apache.flink.util.ChildFirstClassLoader.loadClass(
> ChildFirstClassLoader.java:66)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.getDeclaredMethods0(Native Method)
>     at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>     at java.lang.Class.getDeclaredMethod(Class.java:2128)
>     at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:
> 1629)
>     at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>     at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
> 1885)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751
> )
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2042)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2287)
>     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:
> 561)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .readObject(FlinkKafkaProducer.java:1202)
>     at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:
> 1170)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2178)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2287)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2211)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2287)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2211)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:576)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:562)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:550)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:511)
>     at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:235)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:427)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:418)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:354)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
> OperatorChain.java:144)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:370)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng