You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2020/04/01 09:25:56 UTC

Re: Serialize avro data using apache beam (KafkaIO.write)

It seems that either you need to set a coder for “aadccheck.out2” pcollection explicitly or put it into CoderRegistry.

> On 31 Mar 2020, at 21:53, bharghavi vajrala <vb...@gmail.com> wrote:
> 
> To publish data in avro format to kafka topic:
> 
> I am trying to build a generic record using AvroCoder and KafkaAvroSerializer . However, facing issues. Below is the code and error. Any help would be much appreciated.
> 
>   final public static TupleTag<GenericRecord> tag1 = new 
>            TupleTag<GenericRecord>() {
>    };
> 
>     public static final TupleTagList tagList = TupleTagList.of(tag2).and(tag3);
> 
> PCollectionTuple mixedCollection =
>                inputStream.apply("check", ParDo.of(new TransformAVRO()).
>                        withOutputTags(tag1, tagList));
> 
>        Schema schema = new Schema.Parser().parse(
>                getClass().getResourceAsStream("/schema.avsc"));
>        AvroCoder<GenericRecord> genericCoder = AvroCoder.of(schema);
> 
>        PCollection<GenericRecord> testAvro = mixedCollection.
>        get(tag2).setCoder(genericCoder);
> 
> Error: java.lang.IllegalStateException: Unable to return a default Coder for aadccheck.out2 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.avro.generic.GenericRecord. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.