You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/12/16 06:59:55 UTC

failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode
with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf

I tried Application Mode [1] using the exact same artifact with the
following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
\

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf

but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
                [] - session-window -> (Sink: kafka-sink, Sink:
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.Serializer

        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more

I have flink-connector-kafka_2.11 in my artifact and don't have it under
flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Posted by Yang Wang <da...@gmail.com>.
I am not sure about the root cause, but it seems that you could force the
default NIO-based transport to work around[1].
Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your
submission commands.

[1].
https://stackoverflow.com/questions/48762857/java-lang-classcastexception-netty-fail-on-jar-execution-on-flink

Best,
Yang

Dongwon Kim <ea...@gmail.com> 于2020年12月16日周三 下午5:37写道:

> Hi Yang,
>
> Thanks for the detailed explanation!
>
> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
>> command and have a try? After that, we
>> will disable the user jars including in the system classpath.
>
>
> I tried the following as you suggested:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
>
>   -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
>     -Dyarn.ship-files=myconf.conf \
>
>     -Dyarn.per-job-cluster.include-user-jar=DISABLED \
>
>     hdfs:///jars/myjar.jar myconf.conf
>
> Unfortunately, this attempt fails with the following exception on TMs:
>
>> 2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task
>>                  [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6)
>> switched from RUNNING to FAILED.
>> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup
>> cannot be cast to io.netty.channel.EventLoopGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:211)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:196)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>> Suppressed: java.lang.ClassCastException:
>> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
>> io.netty.util.concurrent.EventExecutorGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>> ~[?:1.8.0_222]
>> at
>> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>> ~[?:1.8.0_222]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>
>
> The exception seems to come from another operator, not Kafka, and this
> operator performs async io using Lettuce, an async Redis client API.
>
> Best,
>
> Dongwon
>
> On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> For application mode, the job submission happens in the JobManager side.
>> We are using an embedded client
>> to submit the job. So the user jar will be added to distributed cache.
>> When deploying a task to TaskManager,
>> it will be downloaded again and run in user classloader even though we
>> already have it in the system classpath.
>>
>> I think it might be the reason why these classes are loaded by different
>> classloaders.
>>
>> For per-job mode, we are recovering the job and the user jars will not be
>> added to distributed cache.
>>
>> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
>> command and have a try? After that, we
>> will disable the user jars including in the system classpath.
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> Dongwon Kim <ea...@gmail.com> 于2020年12月16日周三 下午4:20写道:
>>
>>> Robert,
>>>
>>> But if Kafka is really only available in the user jar, then this error
>>>> still should not occur.
>>>
>>> I think so too; it should not occur.
>>> I scan through all the jar files in the classpath using `jar tf` but no
>>> jar contains org.apache.kafka.common.serialization.Deserializer with a
>>> different version.
>>>
>>> In your case it seems that the classes are loaded from different
>>>> classloaders.
>>>
>>> Hmm, why did the artifact work fine with per-job cluster mode?
>>>
>>> p.s. Another user seems to face the same problem:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>>
>>> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hey Dongwon,
>>>>
>>>> I don't think this is the intended behavior.
>>>> I believe in application mode, we are adding the user jar into the
>>>> system classloader as well. In your case it seems that the classes are
>>>> loaded from different classloaders.
>>>> But if Kafka is really only available in the user jar, then this error
>>>> still should not occur.
>>>>
>>>>
>>>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> I just added the following option to the script:
>>>>>
>>>>>
>>>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>>>>
>>>>> Now it seems to work.
>>>>>
>>>>> Why do the application mode and the per-job cluster mode behave
>>>>> differently when it comes to the classloading?
>>>>>
>>>>> Is it a bug? or intended?
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <ea...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have an artifact which works perfectly fine with Per-Job Cluster
>>>>>> Mode with the following bash script:
>>>>>>
>>>>>> #!/bin/env bash
>>>>>>
>>>>>> export FLINK_CONF_DIR=./conf
>>>>>>
>>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>>
>>>>>>
>>>>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>>>>
>>>>>> I tried Application Mode [1] using the exact same artifact with the
>>>>>> following script:
>>>>>>
>>>>>> #!/bin/env bash
>>>>>>
>>>>>>
>>>>>> export FLINK_CONF_DIR=./conf
>>>>>>
>>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>>
>>>>>>
>>>>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>>>>
>>>>>>     -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>>>>> \
>>>>>>
>>>>>>     -Dyarn.ship-files=myconf.conf \
>>>>>>
>>>>>>     hdfs:///jars/myjar.jar myconf.conf
>>>>>>
>>>>>> but the job fails with the following exception
>>>>>>
>>>>>> 2020-12-16 15:52:25,364 WARN
>>>>>> org.apache.flink.runtime.taskmanager.Task                    [] -
>>>>>> session-window -> (Sink: kafka-sink, Sink: session-window-late-data)
>>>>>> (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to
>>>>>> FAILED.
>>>>>>
>>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>>>>> producer
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>>>>
>>>>>> Caused by: org.apache.kafka.common.KafkaException: class
>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>>>>> instance of org.apache.kafka.common.serialization.Serializer
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         ... 23 more
>>>>>>
>>>>>> I have flink-connector-kafka_2.11 in my artifact and don't have it
>>>>>> under flink lib directory at all.
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> p.s. the attached is the detailed log message from a TM
>>>>>>
>>>>>> Dongwon
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>>>>
>>>>>>
>>>>>

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Yang,

Thanks for the detailed explanation!

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
> command and have a try? After that, we
> will disable the user jars including in the system classpath.


I tried the following as you suggested:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \


  -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
\

    -Dyarn.ship-files=myconf.conf \

    -Dyarn.per-job-cluster.include-user-jar=DISABLED \

    hdfs:///jars/myjar.jar myconf.conf

Unfortunately, this attempt fails with the following exception on TMs:

> 2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task
>                  [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6)
> switched from RUNNING to FAILED.
> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup
> cannot be cast to io.netty.channel.EventLoopGroup
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connect(RedisClient.java:211)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connect(RedisClient.java:196)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
> Suppressed: java.lang.ClassCastException:
> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
> io.netty.util.concurrent.EventExecutorGroup
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
> ~[?:1.8.0_222]
> at
> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
> ~[?:1.8.0_222]
> at
> io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>

The exception seems to come from another operator, not Kafka, and this
operator performs async io using Lettuce, an async Redis client API.

Best,

Dongwon

On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <da...@gmail.com> wrote:

> Hi Dongwon,
>
> For application mode, the job submission happens in the JobManager side.
> We are using an embedded client
> to submit the job. So the user jar will be added to distributed cache.
> When deploying a task to TaskManager,
> it will be downloaded again and run in user classloader even though we
> already have it in the system classpath.
>
> I think it might be the reason why these classes are loaded by different
> classloaders.
>
> For per-job mode, we are recovering the job and the user jars will not be
> added to distributed cache.
>
> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
> command and have a try? After that, we
> will disable the user jars including in the system classpath.
>
>
> Best,
> Yang
>
>
>
> Dongwon Kim <ea...@gmail.com> 于2020年12月16日周三 下午4:20写道:
>
>> Robert,
>>
>> But if Kafka is really only available in the user jar, then this error
>>> still should not occur.
>>
>> I think so too; it should not occur.
>> I scan through all the jar files in the classpath using `jar tf` but no
>> jar contains org.apache.kafka.common.serialization.Deserializer with a
>> different version.
>>
>> In your case it seems that the classes are loaded from different
>>> classloaders.
>>
>> Hmm, why did the artifact work fine with per-job cluster mode?
>>
>> p.s. Another user seems to face the same problem:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>>
>> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hey Dongwon,
>>>
>>> I don't think this is the intended behavior.
>>> I believe in application mode, we are adding the user jar into the
>>> system classloader as well. In your case it seems that the classes are
>>> loaded from different classloaders.
>>> But if Kafka is really only available in the user jar, then this error
>>> still should not occur.
>>>
>>>
>>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> I just added the following option to the script:
>>>>
>>>>
>>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>>>
>>>> Now it seems to work.
>>>>
>>>> Why do the application mode and the per-job cluster mode behave
>>>> differently when it comes to the classloading?
>>>>
>>>> Is it a bug? or intended?
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have an artifact which works perfectly fine with Per-Job Cluster
>>>>> Mode with the following bash script:
>>>>>
>>>>> #!/bin/env bash
>>>>>
>>>>> export FLINK_CONF_DIR=./conf
>>>>>
>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>
>>>>>
>>>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>>>
>>>>> I tried Application Mode [1] using the exact same artifact with the
>>>>> following script:
>>>>>
>>>>> #!/bin/env bash
>>>>>
>>>>>
>>>>> export FLINK_CONF_DIR=./conf
>>>>>
>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>
>>>>>
>>>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>>>
>>>>>     -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>>>> \
>>>>>
>>>>>     -Dyarn.ship-files=myconf.conf \
>>>>>
>>>>>     hdfs:///jars/myjar.jar myconf.conf
>>>>>
>>>>> but the job fails with the following exception
>>>>>
>>>>> 2020-12-16 15:52:25,364 WARN
>>>>> org.apache.flink.runtime.taskmanager.Task                    [] -
>>>>> session-window -> (Sink: kafka-sink, Sink: session-window-late-data)
>>>>> (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to
>>>>> FAILED.
>>>>>
>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>>>> producer
>>>>>
>>>>>         at
>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>>>
>>>>> Caused by: org.apache.kafka.common.KafkaException: class
>>>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>>>> instance of org.apache.kafka.common.serialization.Serializer
>>>>>
>>>>>         at
>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         at
>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>
>>>>>         ... 23 more
>>>>>
>>>>> I have flink-connector-kafka_2.11 in my artifact and don't have it
>>>>> under flink lib directory at all.
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> p.s. the attached is the detailed log message from a TM
>>>>>
>>>>> Dongwon
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>>>
>>>>>
>>>>

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Posted by Yang Wang <da...@gmail.com>.
Hi Dongwon,

For application mode, the job submission happens in the JobManager side. We
are using an embedded client
to submit the job. So the user jar will be added to distributed cache. When
deploying a task to TaskManager,
it will be downloaded again and run in user classloader even though we
already have it in the system classpath.

I think it might be the reason why these classes are loaded by different
classloaders.

For per-job mode, we are recovering the job and the user jars will not be
added to distributed cache.

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
command and have a try? After that, we
will disable the user jars including in the system classpath.


Best,
Yang



Dongwon Kim <ea...@gmail.com> 于2020年12月16日周三 下午4:20写道:

> Robert,
>
> But if Kafka is really only available in the user jar, then this error
>> still should not occur.
>
> I think so too; it should not occur.
> I scan through all the jar files in the classpath using `jar tf` but no
> jar contains org.apache.kafka.common.serialization.Deserializer with a
> different version.
>
> In your case it seems that the classes are loaded from different
>> classloaders.
>
> Hmm, why did the artifact work fine with per-job cluster mode?
>
> p.s. Another user seems to face the same problem:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812
>
> Thanks,
>
> Dongwon
>
>
>
> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hey Dongwon,
>>
>> I don't think this is the intended behavior.
>> I believe in application mode, we are adding the user jar into the system
>> classloader as well. In your case it seems that the classes are loaded from
>> different classloaders.
>> But if Kafka is really only available in the user jar, then this error
>> still should not occur.
>>
>>
>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> I just added the following option to the script:
>>>
>>>
>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>>
>>> Now it seems to work.
>>>
>>> Why do the application mode and the per-job cluster mode behave
>>> differently when it comes to the classloading?
>>>
>>> Is it a bug? or intended?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an artifact which works perfectly fine with Per-Job Cluster Mode
>>>> with the following bash script:
>>>>
>>>> #!/bin/env bash
>>>>
>>>> export FLINK_CONF_DIR=./conf
>>>>
>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>
>>>>
>>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>>
>>>> I tried Application Mode [1] using the exact same artifact with the
>>>> following script:
>>>>
>>>> #!/bin/env bash
>>>>
>>>>
>>>> export FLINK_CONF_DIR=./conf
>>>>
>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>
>>>>
>>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>>
>>>>     -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>>> \
>>>>
>>>>     -Dyarn.ship-files=myconf.conf \
>>>>
>>>>     hdfs:///jars/myjar.jar myconf.conf
>>>>
>>>> but the job fails with the following exception
>>>>
>>>> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>>>>                   [] - session-window -> (Sink: kafka-sink, Sink:
>>>> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
>>>> switched from RUNNING to FAILED.
>>>>
>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>>> producer
>>>>
>>>>         at
>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>
>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>>
>>>> Caused by: org.apache.kafka.common.KafkaException: class
>>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>>> instance of org.apache.kafka.common.serialization.Serializer
>>>>
>>>>         at
>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         at
>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>
>>>>         ... 23 more
>>>>
>>>> I have flink-connector-kafka_2.11 in my artifact and don't have it
>>>> under flink lib directory at all.
>>>>
>>>> Thanks in advance,
>>>>
>>>> p.s. the attached is the detailed log message from a TM
>>>>
>>>> Dongwon
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>>
>>>>
>>>

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Posted by Dongwon Kim <ea...@gmail.com>.
Robert,

But if Kafka is really only available in the user jar, then this error
> still should not occur.

I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar
contains org.apache.kafka.common.serialization.Deserializer with a
different version.

In your case it seems that the classes are loaded from different
> classloaders.

Hmm, why did the artifact work fine with per-job cluster mode?

p.s. Another user seems to face the same problem:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812

Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rm...@apache.org> wrote:

> Hey Dongwon,
>
> I don't think this is the intended behavior.
> I believe in application mode, we are adding the user jar into the system
> classloader as well. In your case it seems that the classes are loaded from
> different classloaders.
> But if Kafka is really only available in the user jar, then this error
> still should not occur.
>
>
> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> I just added the following option to the script:
>>
>>
>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>
>> Now it seems to work.
>>
>> Why do the application mode and the per-job cluster mode behave
>> differently when it comes to the classloading?
>>
>> Is it a bug? or intended?
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have an artifact which works perfectly fine with Per-Job Cluster Mode
>>> with the following bash script:
>>>
>>> #!/bin/env bash
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>
>>> I tried Application Mode [1] using the exact same artifact with the
>>> following script:
>>>
>>> #!/bin/env bash
>>>
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>
>>>     -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>> \
>>>
>>>     -Dyarn.ship-files=myconf.conf \
>>>
>>>     hdfs:///jars/myjar.jar myconf.conf
>>>
>>> but the job fails with the following exception
>>>
>>> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - session-window -> (Sink: kafka-sink, Sink:
>>> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
>>> switched from RUNNING to FAILED.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>> producer
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>
>>> Caused by: org.apache.kafka.common.KafkaException: class
>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>> instance of org.apache.kafka.common.serialization.Serializer
>>>
>>>         at
>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         ... 23 more
>>>
>>> I have flink-connector-kafka_2.11 in my artifact and don't have it under
>>> flink lib directory at all.
>>>
>>> Thanks in advance,
>>>
>>> p.s. the attached is the detailed log message from a TM
>>>
>>> Dongwon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>
>>>
>>

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Posted by Dongwon Kim <ea...@gmail.com>.
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization

Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently
when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi,
>
> I have an artifact which works perfectly fine with Per-Job Cluster Mode
> with the following bash script:
>
> #!/bin/env bash
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>
> I tried Application Mode [1] using the exact same artifact with the
> following script:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
>     -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
>     -Dyarn.ship-files=myconf.conf \
>
>     hdfs:///jars/myjar.jar myconf.conf
>
> but the job fails with the following exception
>
> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - session-window -> (Sink: kafka-sink, Sink:
> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
> switched from RUNNING to FAILED.
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>
> Caused by: org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArraySerializer is not an
> instance of org.apache.kafka.common.serialization.Serializer
>
>         at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         ... 23 more
>
> I have flink-connector-kafka_2.11 in my artifact and don't have it under
> flink lib directory at all.
>
> Thanks in advance,
>
> p.s. the attached is the detailed log message from a TM
>
> Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>
>