You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jürgen Kreileder (Jira)" <ji...@apache.org> on 2020/02/24 18:40:00 UTC
[jira] [Updated] (FLINK-16262) Class loader problem with
FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jürgen Kreileder updated FLINK-16262:
-------------------------------------
Description:
We're using Docker images modeled after [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] (using Java 11)
When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the taskmanager startup fails with:
{code:java}
2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source){code}
This looks like class loading issue: If I copy our JAR to FLINK_LIB_DIR instead of FLINK_USR_LIB_DIR, everything works find.
(AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
was:
We're using Docker images modeled after [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] (using Java 11)
When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the taskmanager startup fails with:
{code:java}
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source){code}
This looks like class loading issue: If I copy our JAR to FLINK_LIB_DIR instead of FLINK_USR_LIB_DIR, everything works find.
(AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-16262
> URL: https://issues.apache.org/jira/browse/FLINK-16262
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 build (nothing changed regarding Kafka and/or class loading).
> Reporter: Jürgen Kreileder
> Priority: Major
>
> We're using Docker images modeled after [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
> at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
> at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
> at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
> at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
> at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
> at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
> at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
> at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source)
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
> at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
> at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source){code}
> This looks like class loading issue: If I copy our JAR to FLINK_LIB_DIR instead of FLINK_USR_LIB_DIR, everything works find.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)