You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tarun joshi <19...@gmail.com> on 2021/08/30 17:13:05 UTC

Flink issues with Avro GenericRecord serialization

Hey all,
I am trying to write a simple pipeline to read

Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
-> for the purpose of writing Parquet files to AWS S3.

1) This is my SimpleMapper

public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
    private static final GsonBuilder gsonBuilder =
            new
GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();

    private static final Gson gson = gsonBuilder.create();
    private static final Schema schema =
ReflectData.get().getSchema(Response.class);

    @Override
    public GenericRecord map(String s) throws Exception {

        Response response = gson.fromJson(s, Response.class);
        GenericData.Record record = new GenericData.Record(schema);
        record.put(0, response);

        return record;
    }

2) This is my Job Definition

public class ClickStreamPipeline implements Serializable {

    private static Schema schema = ReflectData.get().getSchema(Response.class);

    public static void main(String args[]) throws Exception {
        final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);


        FlinkKinesisConsumer<String> kinesisConsumer =
                new FlinkKinesisConsumer<>(
                        "web-clickstream", new SimpleStringSchema(),
getKafkaConsumerProperties());

        final StreamingFileSink<GenericRecord> streamingFileSink =
                StreamingFileSink.forBulkFormat(
                        new
Path("s3://data-ingestion-pipeline/flink_pipeline/"),
                        ParquetAvroWriters.forGenericRecord(schema))
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

        env.addSource(kinesisConsumer)
                .map(new SimpleMapper())
                .returns(new GenericRecordAvroTypeInfo(schema))
                .addSink(streamingFileSink);

        env.execute("Read files in streaming fashion");
    }

    private static StreamExecutionEnvironment getStreamExecutionEnvironment(
            MultipleParameterTool params) throws ClassNotFoundException {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
        env.getConfig()
                .addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
        env.enableCheckpointing(60_000L);

        return env;
    }



The issue I am facing is failiing to serialize the Avro GenericRecord
wrapped message

   - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro
   as preferred Serializer , I am getting the error below


*              java.lang.ClassCastException: class <my fully qualified
POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*



   - If I don't use the GenericRecordAvroTypeInfo and try to register my
   pojo with KryoSerializer , the serialization fails with NPE somewhere in my
   Schema class.Do I need to implement/register a proper Avro serializer with
   flink config?

Thanks for the help!

Re: Flink issues with Avro GenericRecord serialization

Posted by tarun joshi <19...@gmail.com>.
This is resolved by the first approach I mentioned.

Thanks Team

On Mon, 30 Aug 2021 at 12:35, tarun joshi <19...@gmail.com> wrote:

> An update on this , I see that `IndexedRecord` is part of Avro Library.
> Please correct me If I am wrong in assuming that the "Pojo's generated by
> Avro POJO generator must be implementing IndexedRecord interface"  It seems
> either
>
>
>    -  I should be parsing Stringified Json from AWS Kinesis directly into
>    Avro.
>    -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
>    Stream-time.
>
> Please let me know if anyone has a better way to do this.
>
>
> On Mon, 30 Aug 2021 at 10:13, tarun joshi <19...@gmail.com> wrote:
>
>> Hey all,
>> I am trying to write a simple pipeline to read
>>
>> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
>> -> for the purpose of writing Parquet files to AWS S3.
>>
>> 1) This is my SimpleMapper
>>
>> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
>>     private static final GsonBuilder gsonBuilder =
>>             new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>>
>>     private static final Gson gson = gsonBuilder.create();
>>     private static final Schema schema = ReflectData.get().getSchema(Response.class);
>>
>>     @Override
>>     public GenericRecord map(String s) throws Exception {
>>
>>         Response response = gson.fromJson(s, Response.class);
>>         GenericData.Record record = new GenericData.Record(schema);
>>         record.put(0, response);
>>
>>         return record;
>>     }
>>
>> 2) This is my Job Definition
>>
>> public class ClickStreamPipeline implements Serializable {
>>
>>     private static Schema schema = ReflectData.get().getSchema(Response.class);
>>
>>     public static void main(String args[]) throws Exception {
>>         final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
>>         StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
>>
>>
>>         FlinkKinesisConsumer<String> kinesisConsumer =
>>                 new FlinkKinesisConsumer<>(
>>                         "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties());
>>
>>         final StreamingFileSink<GenericRecord> streamingFileSink =
>>                 StreamingFileSink.forBulkFormat(
>>                         new Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>>                         ParquetAvroWriters.forGenericRecord(schema))
>>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>                         .build();
>>
>>         env.addSource(kinesisConsumer)
>>                 .map(new SimpleMapper())
>>                 .returns(new GenericRecordAvroTypeInfo(schema))
>>                 .addSink(streamingFileSink);
>>
>>         env.execute("Read files in streaming fashion");
>>     }
>>
>>     private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>>             MultipleParameterTool params) throws ClassNotFoundException {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().setGlobalJobParameters(params);
>>
>>         Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
>>         env.getConfig()
>>                 .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
>>         env.enableCheckpointing(60_000L);
>>
>>         return env;
>>     }
>>
>>
>>
>> The issue I am facing is failiing to serialize the Avro GenericRecord
>> wrapped message
>>
>>    - When I used a GenericRecordAvroTypeInfo(schema); to force use my
>>    Avro as preferred Serializer , I am getting the error below
>>
>>
>> *              java.lang.ClassCastException: class <my fully qualified
>> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>>
>>
>>
>>    - If I don't use the GenericRecordAvroTypeInfo and try to register my
>>    pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>>    Schema class.Do I need to implement/register a proper Avro serializer with
>>    flink config?
>>
>> Thanks for the help!
>>
>

Re: Flink issues with Avro GenericRecord serialization

Posted by tarun joshi <19...@gmail.com>.
An update on this , I see that `IndexedRecord` is part of Avro Library.
Please correct me If I am wrong in assuming that the "Pojo's generated by
Avro POJO generator must be implementing IndexedRecord interface"  It seems
either


   -  I should be parsing Stringified Json from AWS Kinesis directly into
   Avro.
   -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
   Stream-time.

Please let me know if anyone has a better way to do this.


On Mon, 30 Aug 2021 at 10:13, tarun joshi <19...@gmail.com> wrote:

> Hey all,
> I am trying to write a simple pipeline to read
>
> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
> -> for the purpose of writing Parquet files to AWS S3.
>
> 1) This is my SimpleMapper
>
> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
>     private static final GsonBuilder gsonBuilder =
>             new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>
>     private static final Gson gson = gsonBuilder.create();
>     private static final Schema schema = ReflectData.get().getSchema(Response.class);
>
>     @Override
>     public GenericRecord map(String s) throws Exception {
>
>         Response response = gson.fromJson(s, Response.class);
>         GenericData.Record record = new GenericData.Record(schema);
>         record.put(0, response);
>
>         return record;
>     }
>
> 2) This is my Job Definition
>
> public class ClickStreamPipeline implements Serializable {
>
>     private static Schema schema = ReflectData.get().getSchema(Response.class);
>
>     public static void main(String args[]) throws Exception {
>         final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
>
>
>         FlinkKinesisConsumer<String> kinesisConsumer =
>                 new FlinkKinesisConsumer<>(
>                         "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties());
>
>         final StreamingFileSink<GenericRecord> streamingFileSink =
>                 StreamingFileSink.forBulkFormat(
>                         new Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>                         ParquetAvroWriters.forGenericRecord(schema))
>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                         .build();
>
>         env.addSource(kinesisConsumer)
>                 .map(new SimpleMapper())
>                 .returns(new GenericRecordAvroTypeInfo(schema))
>                 .addSink(streamingFileSink);
>
>         env.execute("Read files in streaming fashion");
>     }
>
>     private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>             MultipleParameterTool params) throws ClassNotFoundException {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().setGlobalJobParameters(params);
>
>         Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
>         env.getConfig()
>                 .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
>         env.enableCheckpointing(60_000L);
>
>         return env;
>     }
>
>
>
> The issue I am facing is failiing to serialize the Avro GenericRecord
> wrapped message
>
>    - When I used a GenericRecordAvroTypeInfo(schema); to force use my
>    Avro as preferred Serializer , I am getting the error below
>
>
> *              java.lang.ClassCastException: class <my fully qualified
> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>
>
>
>    - If I don't use the GenericRecordAvroTypeInfo and try to register my
>    pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>    Schema class.Do I need to implement/register a proper Avro serializer with
>    flink config?
>
> Thanks for the help!
>