You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2021/03/09 14:27:00 UTC

[jira] [Created] (FLINK-21691) KafkaSource fails with NPE when setting it up

Till Rohrmann created FLINK-21691:
-------------------------------------

             Summary: KafkaSource fails with NPE when setting it up
                 Key: FLINK-21691
                 URL: https://issues.apache.org/jira/browse/FLINK-21691
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.12.2, 1.13.0
            Reporter: Till Rohrmann
             Fix For: 1.13.0, 1.12.3


A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
    val builder = KafkaSource.builder<String>()
        .setBootstrapServers(params.get("bootstrapServers"))
        .setGroupId(params.get("groupId"))
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setTopics("topic")
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

    if (params.getBoolean("boundedSource", false)) {
        builder.setBounded(OffsetsInitializer.latest())
    }

    return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)