You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Nicolas Delsaux <ni...@gmx.fr> on 2024/02/09 21:18:37 UTC

How to deserialize Avro content from BigQuery using Apache Beam?

Hi all,

This question is a copy of https://stackoverflow.com/q/77970866/15619 I
ask here to have the best chance to have an answer. (but don't worry,
I'll take care of the cross communication)

I'm currently trying to read a data table from Google BigQuery using
Apache Beam/Google Dataflow. I have extracted the avro schema from
BigQuery and generated the Java classes from that avro schema. As I
don't want to read all the table, I use an SQL query to get the last
record. And since the documentation states that
|BigQueryIO.readTableRows(....)|
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html#readTableRows-->
is slower than |BigQueryIO.read(SerializableFunction<SchemaAndRecord,
T>)|
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html#read-org.apache.beam.sdk.transforms.SerializableFunction->,
I tried to use |BigQueryIO.readWithDatumReader(...)|
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html#readWithDatumReader-org.apache.beam.sdk.extensions.avro.io.AvroSource.DatumReaderFactory->
the following way

|return pipeline.apply("Read from BigQuery query", BigQueryIO
.readWithDatumReader((AvroSource.DatumReaderFactory<SupplyChain>)
(writer, reader) -> new
SpecificDatumReader<>(SupplyChain.getClassSchema())) .fromQuery(query)
.withQueryLocation("EU") .usingStandardSql()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) ); |

Unfortunatly, it fails with the following exception

|java.lang.NullPointerException: Cannot invoke "Object.getClass()"
because "instance" is null at
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters(TypeDescriptors.java:354)
at
org.apache.beam.sdk.values.TypeDescriptors.outputOf(TypeDescriptors.java:411)
at
org.apache.beam.sdk.values.TypeDescriptors.outputOf(TypeDescriptors.java:420)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.inferCoder(BigQueryIO.java:1070)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:1247)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:914)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:508) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56) at
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:194) at
com.auchan.hermes.prd03.Run.source(Run.java:59) at
com.auchan.hermes.prd03.ReadDataFromBigQueryTest.test_can_read_data_from_BigQuery(ReadDataFromBigQueryTest.java:71)
at java.base/java.lang.reflect.Method.invoke(Method.java:568) at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323) |

After having read the source code, I've noticed there is in the
|BigQueryIO.read(SerializableFunction<SchemaAndRecord, T>)| source code
a call to |.setParseFn(parseFn)| which is missing from the
|BigQueryIO.readWithDatumReader(...)| method. Is there something else to
do? Am I missing something?