You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Raghu Angadi (JIRA)" <ji...@apache.org> on 2018/01/24 20:53:00 UTC

[jira] [Closed] (BEAM-2257) KafkaIO write without key requires a producer fn

     [ https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raghu Angadi closed BEAM-2257.
------------------------------
    Resolution: Fixed

> KafkaIO write without key requires a producer fn
> ------------------------------------------------
>
>                 Key: BEAM-2257
>                 URL: https://issues.apache.org/jira/browse/BEAM-2257
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 2.3.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} to the topic without key:
> {code}
>    PCollection<String> strings = ...;
>    strings.apply(KafkaIO.<Void, String>write()
>        .withBootstrapServers("broker_1:9092,broker_2:9092")
>        .withTopic("results")
>        .withValueSerializer(new StringSerializer()) // just need serializer for value
>        .values()
>      );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:300)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)