You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Debraj Manna <su...@gmail.com> on 2021/08/23 15:37:02 UTC

Flink 1.13.1 Kafka Producer Error

I am trying to use flink kafka producer like below

public static FlinkKafkaProducer<SelfDescribingMessageDO> createProducer() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "<Server details>");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());

        return new FlinkKafkaProducer<>(
                "FlinkSdmKafkaTopic",
                new SerializationSchema("FlinkSdmKafkaTopic", 8),
                props,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
    }

private static class SerializationSchema implements
KafkaSerializationSchema<SelfDescribingMessageDO> {
    final String topic;
    final int numPartitions;

    public SerializationSchema(final String topic, final int numPartitions) {
        this.topic = topic;
        this.numPartitions = numPartitions;
    }

    @Override
    public ProducerRecord<byte[], byte[]>
serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
        return new ProducerRecord<>(topic,
                KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
                sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
                sdm.toByteArray());
    }
}

I am getting the below exception when trying to deploy the flink job.
During unit tests I am not getting this error.

2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source:
MetricSource -> Filter -> MetricStoreMapper -> (Filter ->
Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11,
Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource ->
Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map
-> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0
(5764a387ede7d6710bcf3ad4e2222248) switched from INITIALIZING to
FAILED with failure cause: org.apache.kafka.common.KafkaException:
Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.Serializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)

Can someone let me know what is going wrong?

I have added flink connector kafka as my dependency in the application code.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

I only have flink-connector-kafka as the non test dependency in my pom
(for kafka).

[INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
[INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
[INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
[INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
[INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
[INFO] |     +- org.apache.kafka:kafka-tools:jar:2.4.0:test
[INFO] |     |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
[INFO] |     +- org.apache.kafka:connect-transforms:jar:2.4.0:test

Re: Flink 1.13.1 Kafka Producer Error

Posted by Arvid Heise <ar...@apache.org>.
Hi Debraj,

such errors are usually caused by having two versions of Kafka in different
places. Did you put flink-connector-kafka into lib/, plugin/, or
specifically pointed yarn to it in some way?

You should only include it into your user jar. The user jar should not
reside in any of the aforementioned places as well.

Best,

Arvid

On Tue, Aug 24, 2021 at 2:02 PM Debraj Manna <su...@gmail.com>
wrote:

> Fabian
>
> I am running it inside yarn.
>
> Thanks,
>
> On Tue, Aug 24, 2021 at 5:27 PM Fabian Paul <fa...@ververica.com>
> wrote:
>
>> Hi Debraj
>>
>> How do you run your application? If you run it from an IDE you can set a
>> breakpoint and inspect the serializer class which is used.
>>
>> Best,
>> Fabian
>
>

Re: Flink 1.13.1 Kafka Producer Error

Posted by Debraj Manna <su...@gmail.com>.
Fabian

I am running it inside yarn.

Thanks,

On Tue, Aug 24, 2021 at 5:27 PM Fabian Paul <fa...@ververica.com>
wrote:

> Hi Debraj
>
> How do you run your application? If you run it from an IDE you can set a
> breakpoint and inspect the serializer class which is used.
>
> Best,
> Fabian

Re: Flink 1.13.1 Kafka Producer Error

Posted by Fabian Paul <fa...@ververica.com>.
Hi Debraj

How do you run your application? If you run it from an IDE you can set a breakpoint and inspect the serializer class which is used.

Best,
Fabian

Re: Flink 1.13.1 Kafka Producer Error

Posted by Debraj Manna <su...@gmail.com>.
yes I initially did not add
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG` or
`ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
I was getting the same error so tried setting them explicitly.

I did mvn dependency:tree | grep -i kafka. I did not see any other versions
of Kafka in non test dependency and this error I am not getting during test
but only when I am running my flink application.


[INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
[INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
[INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
[INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
[INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
[INFO] |     +- org.apache.kafka:kafka-tools:jar:2.4.0:test
[INFO] |     |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
[INFO] |     +- org.apache.kafka:connect-transforms:jar:2.4.0:test


On Tue, Aug 24, 2021 at 5:08 PM Fabian Paul <fa...@ververica.com>
wrote:

> Hi Debraj,
>
> The error looks indeed strange. We recommend to not set any `ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG`
> or `ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG`
> because the connector will take care of it. Can you try to remove these
> call and check if it makes a difference?
>
> Only looking at the error message it feels like different versions of
> the Kafka dependency are on the class path.
>
> Best,
> Fabian
>

Re: Flink 1.13.1 Kafka Producer Error

Posted by Fabian Paul <fa...@ververica.com>.
Hi Debraj,

The error looks indeed strange. We recommend to not set any `ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG` or `ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG` 
because the connector will take care of it. Can you try to remove these call and check if it makes a difference?

Only looking at the error message it feels like different versions of the Kafka dependency are on the class path.

Best,
Fabian

Re: Flink 1.13.1 Kafka Producer Error

Posted by Debraj Manna <su...@gmail.com>.
The same query has been asked in stackoverflow
<https://stackoverflow.com/questions/68895934/flink-1-13-1-kafka-producer-error-bytearrayserializer-is-not-an-instance-of-org>
also. Another related question
<https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in>
on Stackoverflow. Does anyone have any suggestions?

On Mon, Aug 23, 2021 at 9:07 PM Debraj Manna <su...@gmail.com>
wrote:

> I am trying to use flink kafka producer like below
>
> public static FlinkKafkaProducer<SelfDescribingMessageDO> createProducer()
> {
>         Properties props = new Properties();
>         props.setProperty("bootstrap.servers", "<Server details>");
>         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>         props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>
>         return new FlinkKafkaProducer<>(
>                 "FlinkSdmKafkaTopic",
>                 new SerializationSchema("FlinkSdmKafkaTopic", 8),
>                 props,
>                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>     }
>
> private static class SerializationSchema implements KafkaSerializationSchema<SelfDescribingMessageDO> {
>     final String topic;
>     final int numPartitions;
>
>     public SerializationSchema(final String topic, final int numPartitions) {
>         this.topic = topic;
>         this.numPartitions = numPartitions;
>     }
>
>     @Override
>     public ProducerRecord<byte[], byte[]> serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
>         return new ProducerRecord<>(topic,
>                 KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
>                 sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
>                 sdm.toByteArray());
>     }
> }
>
> I am getting the below exception when trying to deploy the flink job. During unit tests I am not getting this error.
>
> 2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 (5764a387ede7d6710bcf3ad4e2222248) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>         at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
>         at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
>         at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
>         at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>         at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>         at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
>         at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>         at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>         at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>
> Can someone let me know what is going wrong?
>
> I have added flink connector kafka as my dependency in the application code.
>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-connector-kafka_2.12</artifactId>
>     <version>1.13.1</version>
> </dependency>
>
> I only have flink-connector-kafka as the non test dependency in my pom (for kafka).
>
> [INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
> [INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
> [INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
> [INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
> [INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
> [INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
> [INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
> [INFO] |     +- org.apache.kafka:kafka-tools:jar:2.4.0:test
> [INFO] |     |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
> [INFO] |     +- org.apache.kafka:connect-transforms:jar:2.4.0:test
>
>