You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2020/04/01 09:25:56 UTC
Re: Serialize avro data using apache beam (KafkaIO.write)
It seems that either you need to set a coder for “aadccheck.out2” pcollection explicitly or put it into CoderRegistry.
> On 31 Mar 2020, at 21:53, bharghavi vajrala <vb...@gmail.com> wrote:
>
> To publish data in avro format to kafka topic:
>
> I am trying to build a generic record using AvroCoder and KafkaAvroSerializer . However, facing issues. Below is the code and error. Any help would be much appreciated.
>
> final public static TupleTag<GenericRecord> tag1 = new
> TupleTag<GenericRecord>() {
> };
>
> public static final TupleTagList tagList = TupleTagList.of(tag2).and(tag3);
>
> PCollectionTuple mixedCollection =
> inputStream.apply("check", ParDo.of(new TransformAVRO()).
> withOutputTags(tag1, tagList));
>
> Schema schema = new Schema.Parser().parse(
> getClass().getResourceAsStream("/schema.avsc"));
> AvroCoder<GenericRecord> genericCoder = AvroCoder.of(schema);
>
> PCollection<GenericRecord> testAvro = mixedCollection.
> get(tag2).setCoder(genericCoder);
>
> Error: java.lang.IllegalStateException: Unable to return a default Coder for aadccheck.out2 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.avro.generic.GenericRecord. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.