You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2021/12/06 10:46:28 UTC

Converting DataStream of Avro SpecificRecord to Table

Hi community,

I'm currently converting a DataStream of Avro SpecificRecord type into
Table using the following method:

public static <T extends SpecificRecord> Table
toTable(StreamTableEnvironment tEnv,
                                                       DataStream<T> dataStream,
                                                       Class<T> cls) {
  RichMapFunction<T, Row> avroSpecific2RowConverter = new RichMapFunction<>() {
    private transient AvroSerializationSchema<T> avro2bin = null;
    private transient AvroRowDeserializationSchema bin2row = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      avro2bin = AvroSerializationSchema.forSpecific(cls);
      bin2row = new AvroRowDeserializationSchema(cls);
    }

    @Override
    public Row map(T value) throws Exception {
      byte[] bytes = avro2bin.serialize(value);
      Row row = bin2row.deserialize(bytes);
      return row;
    }
  };

  SingleOutputStreamOperator<Row> rows =
dataStream.map(avroSpecific2RowConverter)
    // https://issues.apache.org/jira/browse/FLINK-23885
    .returns(AvroSchemaConverter.convertToTypeInfo(cls));

  return tEnv.fromDataStream(rows);
}

I'm wondering whether there's a pre-defined utility for that or a better
way to do so in Flink-1.14.

Best,

Dongwon