You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kannan Jayaprakasam <ka...@google.com> on 2022/06/20 18:29:12 UTC

Apache beam not writing DynamicMessage records to Big Query table

In my pipeline, I have a PCollection<DynamicMessage> to which I attach the
schema coder using ProtoDynamicMessageSchema:

    import com.google.protobuf.Descriptors.Descriptor;
    import com.google.protobuf.DynamicMessage;
    import org.apache.beam.sdk.extensions.protobuf.ProtoDynamicMessageSchema;
    import org.apache.beam.sdk.extensions.protobuf.ProtoDomain;
    import org.apache.beam.sdk.schemas.Schema;

    Descriptor descriptor = getDescriptor();
    ProtoDynamicMessageSchema<DynamicMessage> protoMessageSchema =
    ProtoDynamicMessageSchema.forDescriptor(ProtoDomain.buildFrom(descriptor),
descriptor);
    Schema schema = protoMessageSchema.getSchema();
    PCollection<DynamicMessage> records = ...
    records.setSchema(schema, TypeDescriptor.of(DynamicMessage.class),
protoMessageSchema.getToRowFunction(),
protoMessageSchema.getFromRowFunction());

I apply a write transform to write records from this PCollection to
BigQuery table:

    records.apply(
          BigQueryIO.<DynamicMessage>write()
            .to(BigQueryTableRef.fullBQIOTableName(destinationTableRef))
            .withMethod(Write.Method.DEFAULT)
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .useBeamSchema());

When I run the pipeine, the PCollection has rows but they don't get written
to the Big Query table. There is no error message of any kind reported by
the pipeline.

Apache beam DynamicProtoCoder
<https://www.google.com/url?sa=D&q=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fextensions%2Fprotobuf%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fextensions%2Fprotobuf%2FDynamicProtoCoder.java>
says

<p>{@link DynamicProtoCoder} is not registered in the global {@link
CoderRegistry} as the descriptor is required to create the coder.

I have also attempted after registering DynamicMessage.class with the
Pipeline schema registry:

    Descriptor descriptor = getDescriptor();
    ProtoDynamicMessageSchema<DynamicMessage> protoMessageSchema =
        ProtoDynamicMessageSchema.forDescriptor(ProtoDomain.buildFrom(descriptor),
descriptor);
    pipeline.getSchemaRegistry().registerSchemaForClass(
        DynamicMessage.class,
        protoMessageSchema.getSchema(),
        protoMessageSchema.getToRowFunction(),
        protoMessageSchema.getFromRowFunction());

But nothing.

By stepping through the Write transform I did find that my single output
row of type DynamicMessage is available and has the parsed proto value in
it.
[image: Screenshot 2022-06-20 3.30.16 PM.png]