You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vikash Dat <da...@gmail.com> on 2020/07/30 01:30:57 UTC

Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

I'm using Flink 1.10 and Kafka (AWS MSK) 2.2 and trying to do a simple app
that consumes from one kafka topic and produces events into another topic.
I would like to utilize the exactly_once semantic, however, I am
experiencing the following error:

org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct
kafka producer
    at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.
common.serialization.ByteArraySerializer is not an instance of
org.apache.kafka.common.serialization.Serializer
    at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
    at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
    ... 12 more

My producer is defined as

new FlinkKafkaProducer[String](
  appArgs.authTxnTopic, // target topic
  new KeyedSerializationSchemaWrapper[String](
    new SimpleStringSchema()
  ), // serialization schema
  kafkaProdProps, // producer config,
  FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)


if I remove the exactly_once semantic as below, it works.

new FlinkKafkaProducer[String](
  appArgs.authTxnTopic, // target topic
  new KeyedSerializationSchemaWrapper[String](
    new SimpleStringSchema()
  ), // serialization schema
  kafkaProdProps // producer config
)


I don't understand what I'm incorrectly, as this should work based on the
docs. I see a similar experience noted here:
https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in
without
any answer as well.

Appreciate the help.

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Vikash,

Sorry for the late reply. Is your version of Flink kafka *connector* 1.10.1
too? Actually it's a bug in the connector, so I think you need to upgrade
the connector to 1.10.1 too, not just Flink itself.

I tried Flink 1.10.0/1.10.1 + flink-kafka-connector 1.10.0 and indeed
reproduced the bug. After upgrading flink-kafka-connector to 1.10.1, the
error disappeared.

On Fri, Jul 31, 2020 at 7:02 PM Vikash Dat <da...@gmail.com> wrote:

> Thanks for the reply. I am currently using 1.10 but also saw it happens in
> 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to
> 1.10 at the moment. Are there any known work arounds?
>
> On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren <re...@gmail.com> wrote:
>
>> Hi Vikash,
>>
>> It's a bug about classloader used in `abortTransaction()` method in
>> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
>> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
>> 1.10.0?
>>
>>
>> Vikash Dat <da...@gmail.com> 于2020年7月30日周四 下午9:26写道:
>>
>>> Has anyone had success with using exactly_once in a kafka producer in
>>> flink?
>>> As of right now I don't think the code shown in the docs
>>> (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
>>> )
>>> actually works.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>>
>> --
>> Best Regards,
>>
>> *Qingsheng Ren*
>>
>> Electrical and Computer Engineering
>> Carnegie Mellon University
>>
>> Email: renqschn@gmail.com
>>
>

-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqschn@gmail.com

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

Posted by Vikash Dat <da...@gmail.com>.
Thanks for the reply. I am currently using 1.10 but also saw it happens in
1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to
1.10 at the moment. Are there any known work arounds?

On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren <re...@gmail.com> wrote:

> Hi Vikash,
>
> It's a bug about classloader used in `abortTransaction()` method in
> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
> 1.10.0?
>
>
> Vikash Dat <da...@gmail.com> 于2020年7月30日周四 下午9:26写道:
>
>> Has anyone had success with using exactly_once in a kafka producer in
>> flink?
>> As of right now I don't think the code shown in the docs
>> (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
>> )
>> actually works.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Best Regards,
>
> *Qingsheng Ren*
>
> Electrical and Computer Engineering
> Carnegie Mellon University
>
> Email: renqschn@gmail.com
>

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Vikash,

It's a bug about classloader used in `abortTransaction()` method in
`FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
1.10.0?


Vikash Dat <da...@gmail.com> 于2020年7月30日周四 下午9:26写道:

> Has anyone had success with using exactly_once in a kafka producer in
> flink?
> As of right now I don't think the code shown in the docs
> (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
> )
> actually works.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqschn@gmail.com

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

Posted by Vikash Dat <da...@gmail.com>.
Has anyone had success with using exactly_once in a kafka producer in flink?
As of right now I don't think the code shown in the docs
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer)
actually works.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/