You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tarun joshi <19...@gmail.com> on 2021/08/30 17:13:05 UTC
Flink issues with Avro GenericRecord serialization
Hey all,
I am trying to write a simple pipeline to read
Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
-> for the purpose of writing Parquet files to AWS S3.
1) This is my SimpleMapper
public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
private static final GsonBuilder gsonBuilder =
new
GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
private static final Gson gson = gsonBuilder.create();
private static final Schema schema =
ReflectData.get().getSchema(Response.class);
@Override
public GenericRecord map(String s) throws Exception {
Response response = gson.fromJson(s, Response.class);
GenericData.Record record = new GenericData.Record(schema);
record.put(0, response);
return record;
}
2) This is my Job Definition
public class ClickStreamPipeline implements Serializable {
private static Schema schema = ReflectData.get().getSchema(Response.class);
public static void main(String args[]) throws Exception {
final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
FlinkKinesisConsumer<String> kinesisConsumer =
new FlinkKinesisConsumer<>(
"web-clickstream", new SimpleStringSchema(),
getKafkaConsumerProperties());
final StreamingFileSink<GenericRecord> streamingFileSink =
StreamingFileSink.forBulkFormat(
new
Path("s3://data-ingestion-pipeline/flink_pipeline/"),
ParquetAvroWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
env.addSource(kinesisConsumer)
.map(new SimpleMapper())
.returns(new GenericRecordAvroTypeInfo(schema))
.addSink(streamingFileSink);
env.execute("Read files in streaming fashion");
}
private static StreamExecutionEnvironment getStreamExecutionEnvironment(
MultipleParameterTool params) throws ClassNotFoundException {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig()
.addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(60_000L);
return env;
}
The issue I am facing is failiing to serialize the Avro GenericRecord
wrapped message
- When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro
as preferred Serializer , I am getting the error below
* java.lang.ClassCastException: class <my fully qualified
POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
- If I don't use the GenericRecordAvroTypeInfo and try to register my
pojo with KryoSerializer , the serialization fails with NPE somewhere in my
Schema class.Do I need to implement/register a proper Avro serializer with
flink config?
Thanks for the help!
Re: Flink issues with Avro GenericRecord serialization
Posted by tarun joshi <19...@gmail.com>.
This is resolved by the first approach I mentioned.
Thanks Team
On Mon, 30 Aug 2021 at 12:35, tarun joshi <19...@gmail.com> wrote:
> An update on this , I see that `IndexedRecord` is part of Avro Library.
> Please correct me If I am wrong in assuming that the "Pojo's generated by
> Avro POJO generator must be implementing IndexedRecord interface" It seems
> either
>
>
> - I should be parsing Stringified Json from AWS Kinesis directly into
> Avro.
> - *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
> Stream-time.
>
> Please let me know if anyone has a better way to do this.
>
>
> On Mon, 30 Aug 2021 at 10:13, tarun joshi <19...@gmail.com> wrote:
>
>> Hey all,
>> I am trying to write a simple pipeline to read
>>
>> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
>> -> for the purpose of writing Parquet files to AWS S3.
>>
>> 1) This is my SimpleMapper
>>
>> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
>> private static final GsonBuilder gsonBuilder =
>> new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>>
>> private static final Gson gson = gsonBuilder.create();
>> private static final Schema schema = ReflectData.get().getSchema(Response.class);
>>
>> @Override
>> public GenericRecord map(String s) throws Exception {
>>
>> Response response = gson.fromJson(s, Response.class);
>> GenericData.Record record = new GenericData.Record(schema);
>> record.put(0, response);
>>
>> return record;
>> }
>>
>> 2) This is my Job Definition
>>
>> public class ClickStreamPipeline implements Serializable {
>>
>> private static Schema schema = ReflectData.get().getSchema(Response.class);
>>
>> public static void main(String args[]) throws Exception {
>> final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
>> StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
>>
>>
>> FlinkKinesisConsumer<String> kinesisConsumer =
>> new FlinkKinesisConsumer<>(
>> "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties());
>>
>> final StreamingFileSink<GenericRecord> streamingFileSink =
>> StreamingFileSink.forBulkFormat(
>> new Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>> ParquetAvroWriters.forGenericRecord(schema))
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> .build();
>>
>> env.addSource(kinesisConsumer)
>> .map(new SimpleMapper())
>> .returns(new GenericRecordAvroTypeInfo(schema))
>> .addSink(streamingFileSink);
>>
>> env.execute("Read files in streaming fashion");
>> }
>>
>> private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>> MultipleParameterTool params) throws ClassNotFoundException {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().setGlobalJobParameters(params);
>>
>> Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
>> env.getConfig()
>> .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
>> env.enableCheckpointing(60_000L);
>>
>> return env;
>> }
>>
>>
>>
>> The issue I am facing is failiing to serialize the Avro GenericRecord
>> wrapped message
>>
>> - When I used a GenericRecordAvroTypeInfo(schema); to force use my
>> Avro as preferred Serializer , I am getting the error below
>>
>>
>> * java.lang.ClassCastException: class <my fully qualified
>> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>>
>>
>>
>> - If I don't use the GenericRecordAvroTypeInfo and try to register my
>> pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>> Schema class.Do I need to implement/register a proper Avro serializer with
>> flink config?
>>
>> Thanks for the help!
>>
>
Re: Flink issues with Avro GenericRecord serialization
Posted by tarun joshi <19...@gmail.com>.
An update on this , I see that `IndexedRecord` is part of Avro Library.
Please correct me If I am wrong in assuming that the "Pojo's generated by
Avro POJO generator must be implementing IndexedRecord interface" It seems
either
- I should be parsing Stringified Json from AWS Kinesis directly into
Avro.
- *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
Stream-time.
Please let me know if anyone has a better way to do this.
On Mon, 30 Aug 2021 at 10:13, tarun joshi <19...@gmail.com> wrote:
> Hey all,
> I am trying to write a simple pipeline to read
>
> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
> -> for the purpose of writing Parquet files to AWS S3.
>
> 1) This is my SimpleMapper
>
> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
> private static final GsonBuilder gsonBuilder =
> new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>
> private static final Gson gson = gsonBuilder.create();
> private static final Schema schema = ReflectData.get().getSchema(Response.class);
>
> @Override
> public GenericRecord map(String s) throws Exception {
>
> Response response = gson.fromJson(s, Response.class);
> GenericData.Record record = new GenericData.Record(schema);
> record.put(0, response);
>
> return record;
> }
>
> 2) This is my Job Definition
>
> public class ClickStreamPipeline implements Serializable {
>
> private static Schema schema = ReflectData.get().getSchema(Response.class);
>
> public static void main(String args[]) throws Exception {
> final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
> StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
>
>
> FlinkKinesisConsumer<String> kinesisConsumer =
> new FlinkKinesisConsumer<>(
> "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties());
>
> final StreamingFileSink<GenericRecord> streamingFileSink =
> StreamingFileSink.forBulkFormat(
> new Path("s3://data-ingestion-pipeline/flink_pipeline/"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();
>
> env.addSource(kinesisConsumer)
> .map(new SimpleMapper())
> .returns(new GenericRecordAvroTypeInfo(schema))
> .addSink(streamingFileSink);
>
> env.execute("Read files in streaming fashion");
> }
>
> private static StreamExecutionEnvironment getStreamExecutionEnvironment(
> MultipleParameterTool params) throws ClassNotFoundException {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(params);
>
> Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig()
> .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
> env.enableCheckpointing(60_000L);
>
> return env;
> }
>
>
>
> The issue I am facing is failiing to serialize the Avro GenericRecord
> wrapped message
>
> - When I used a GenericRecordAvroTypeInfo(schema); to force use my
> Avro as preferred Serializer , I am getting the error below
>
>
> * java.lang.ClassCastException: class <my fully qualified
> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>
>
>
> - If I don't use the GenericRecordAvroTypeInfo and try to register my
> pojo with KryoSerializer , the serialization fails with NPE somewhere in my
> Schema class.Do I need to implement/register a proper Avro serializer with
> flink config?
>
> Thanks for the help!
>