You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2021/12/06 10:46:28 UTC
Converting DataStream of Avro SpecificRecord to Table
Hi community,
I'm currently converting a DataStream of Avro SpecificRecord type into
Table using the following method:
public static <T extends SpecificRecord> Table
toTable(StreamTableEnvironment tEnv,
DataStream<T> dataStream,
Class<T> cls) {
RichMapFunction<T, Row> avroSpecific2RowConverter = new RichMapFunction<>() {
private transient AvroSerializationSchema<T> avro2bin = null;
private transient AvroRowDeserializationSchema bin2row = null;
@Override
public void open(Configuration parameters) throws Exception {
avro2bin = AvroSerializationSchema.forSpecific(cls);
bin2row = new AvroRowDeserializationSchema(cls);
}
@Override
public Row map(T value) throws Exception {
byte[] bytes = avro2bin.serialize(value);
Row row = bin2row.deserialize(bytes);
return row;
}
};
SingleOutputStreamOperator<Row> rows =
dataStream.map(avroSpecific2RowConverter)
// https://issues.apache.org/jira/browse/FLINK-23885
.returns(AvroSchemaConverter.convertToTypeInfo(cls));
return tEnv.fromDataStream(rows);
}
I'm wondering whether there's a pre-defined utility for that or a better
way to do so in Flink-1.14.
Best,
Dongwon